Skip to content

Commit

Permalink
Add utils for parsing opus packets and generating PLC. (#15)
Browse files Browse the repository at this point in the history
* Add utils for parsing opus packets and generating PLC.

* ref: Move MediaJsonRecorder impls to own files.

* feat: Detect number of channels from opus stream, improve logging, add a stereo sample.

* Set stereo if any packet in a track has stereo, not just the first one.

* feat: Use packet loss concealment.
  • Loading branch information
bgrozev authored Oct 31, 2024
1 parent 652600d commit 60b15c5
Show file tree
Hide file tree
Showing 13 changed files with 4,099 additions and 118 deletions.
5 changes: 4 additions & 1 deletion src/main/kotlin/org/jitsi/recorder/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import io.ktor.websocket.Frame
import io.ktor.websocket.readText
import org.jitsi.mediajson.Event
import org.jitsi.utils.logging2.LoggerImpl
import org.jitsi.utils.logging2.createLogger
import java.io.File
import kotlin.time.Duration.Companion.seconds
import org.jitsi.recorder.RecorderMetrics.Companion.instance as metrics
Expand Down Expand Up @@ -87,8 +88,10 @@ fun main(args: Array<String>) {
}

class RecordingSession(val meetingId: String) {
private val logger = createLogger().apply { addContext("meetingId", meetingId) }

private val mediaJsonRecorder = if (Config.recordingFormat == RecordingFormat.MKA) {
MediaJsonMkaRecorder(selectDirectory(meetingId))
MediaJsonMkaRecorder(selectDirectory(meetingId), logger)
} else {
MediaJsonJsonRecorder(selectDirectory(meetingId))
}
Expand Down
41 changes: 41 additions & 0 deletions src/main/kotlin/org/jitsi/recorder/MediaJsonJsonRecorder.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Jitsi Multi Track Recorder
*
* Copyright @ 2024-Present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.recorder

import org.jitsi.mediajson.Event
import java.io.BufferedWriter
import java.io.File
import java.io.FileWriter

/**
* Record MediaJson events into a JSON file.
*/
class MediaJsonJsonRecorder(directory: File) : MediaJsonRecorder() {
private val file: File = File(directory, "recording.json")
private val writer: BufferedWriter = BufferedWriter(FileWriter(file, true))

override fun addEvent(event: Event) {
writer.write(event.toJson())
writer.newLine()
writer.flush()
}

override fun stop() {
writer.close()
}
}
158 changes: 158 additions & 0 deletions src/main/kotlin/org/jitsi/recorder/MediaJsonMkaRecorder.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Jitsi Multi Track Recorder
*
* Copyright @ 2024-Present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.recorder

import org.jitsi.mediajson.Event
import org.jitsi.mediajson.MediaEvent
import org.jitsi.mediajson.StartEvent
import org.jitsi.recorder.opus.OpusPacket
import org.jitsi.recorder.opus.PacketLossConcealmentInserter
import org.jitsi.utils.logging2.Logger
import org.jitsi.utils.queue.PacketQueue
import java.io.File
import java.time.Clock
import kotlin.io.encoding.Base64
import kotlin.io.encoding.ExperimentalEncodingApi

/**
* Record MediaJson events into a Matroska file.
*/
class MediaJsonMkaRecorder(directory: File, parentLogger: Logger) : MediaJsonRecorder() {
private val logger: Logger = parentLogger.createChildLogger(this.javaClass.name)

private val mkaRecorder = MkaRecorder(directory, logger)
private var lastSequenceNumber = -1

val queue = EventQueue {
handleEvent(it)
true
}
private val trackRecorders = mutableMapOf<String, TrackRecorder>()

init {
logger.info("Starting a new recording.")
}

override fun addEvent(event: Event) {
queue.add(event)
}

private fun handleEvent(event: Event) {
val seq = event.assertFormatAndGetSeq()
if (lastSequenceNumber != -1 && lastSequenceNumber + 1 != seq) {
logger.warn("Missing sequence number: $lastSequenceNumber -> $seq")
}
lastSequenceNumber = seq

when (event) {
is StartEvent -> {
logger.info("Starting new track: $event")
if (trackRecorders.containsKey(event.start.tag)) {
logger.warn("Track already exists: ${event.start.tag}")
} else {
trackRecorders[event.start.tag] = TrackRecorder(
mkaRecorder,
event.start.tag,
event.start.customParameters?.endpointId,
logger
)
}
}

is MediaEvent -> {
trackRecorders[event.media.tag]?.addPacket(event) ?: logger.warn("No track for ${event.media.tag}")
}
}
}

override fun stop() {
logger.info("Stopping.")
mkaRecorder.close()
queue.close()
}
}

private class TrackRecorder(
private val mkaRecorder: MkaRecorder,
private val trackName: String,
endpointId: String?,
parentLogger: Logger
) {
private val logger: Logger = parentLogger.createChildLogger(this.javaClass.name).apply {
addContext("track", trackName)
}
private val plcInserter = PacketLossConcealmentInserter(logger)
private var stereo = false

init {
logger.info("Starting new track $trackName")
mkaRecorder.startTrack(trackName, endpointId)
}

@OptIn(ExperimentalEncodingApi::class)
fun addPacket(event: MediaEvent) {
val payload = Base64.decode(event.media.payload)
if (payload.isEmpty()) {
logger.warn("Ignoring empty payload: $event")
return
}
val opusPacket = OpusPacket(payload)

if (!stereo && opusPacket.toc().stereo()) {
stereo = true
mkaRecorder.setTrackChannels(trackName, 2)
logger.info("Setting stereo=true.")
}

plcInserter.add(opusPacket, event.media.timestamp).forEach {
mkaRecorder.addFrame(
trackName,
it.timestampMs,
it.packet.data
)
}
}
}

class EventQueue(packetHandler: (Event) -> Boolean) : PacketQueue<Event>(
100,
false,
"id",
packetHandler,
TaskPools.ioPool,
Clock.systemUTC()
)

/**
* Throw an exception if the event is not in the expected sequence, also extract the sequence number for convenience
* since the field is not in the base [Event] class.
*/
private fun Event.assertFormatAndGetSeq(): Int = when (this) {
is StartEvent -> {
if (start.mediaFormat.encoding != "opus") {
throw IllegalArgumentException("Unsupported media format: ${start.mediaFormat.encoding}")
}
if (start.mediaFormat.sampleRate != 48000) {
throw IllegalArgumentException("Unsupported sample rate: ${start.mediaFormat.sampleRate}")
}
sequenceNumber
}
is MediaEvent -> {
sequenceNumber
}
}
79 changes: 3 additions & 76 deletions src/main/kotlin/org/jitsi/recorder/MediaJsonRecorder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,89 +18,16 @@
package org.jitsi.recorder

import org.jitsi.mediajson.Event
import org.jitsi.mediajson.MediaEvent
import org.jitsi.mediajson.StartEvent
import org.jitsi.utils.logging2.createLogger
import org.jitsi.utils.queue.PacketQueue
import java.io.BufferedWriter
import java.io.File
import java.io.FileWriter
import java.time.Clock
import kotlin.io.encoding.Base64
import kotlin.io.encoding.ExperimentalEncodingApi

/**
* An abstract class that represents a recorder that records MediaJson events.
*/
sealed class MediaJsonRecorder {
abstract fun addEvent(event: Event)
abstract fun stop()
}

class MediaJsonJsonRecorder(directory: File) : MediaJsonRecorder() {
private val file: File = File(directory, "recording.json")
private val writer: BufferedWriter = BufferedWriter(FileWriter(file, true))

override fun addEvent(event: Event) {
writer.write(event.toJson())
writer.newLine()
writer.flush()
}

override fun stop() {
writer.close()
}
}

enum class RecordingFormat {
MKA,
JSON
}

class MediaJsonMkaRecorder(directory: File) : MediaJsonRecorder() {
private val logger = createLogger()

init {
logger.info("Will record in $directory")
}

private val mkaRecorder = MkaRecorder(directory)
val queue = EventQueue {
handleEvent(it)
true
}

override fun addEvent(event: Event) {
queue.add(event)
}

@OptIn(ExperimentalEncodingApi::class)
private fun handleEvent(event: Event) {
when (event) {
// split, buffer, generate silence. thread model? queue. metrics?
is StartEvent -> {
logger.info("Start new stream: $event")
mkaRecorder.startTrack(event.start.tag, event.start.customParameters?.endpointId)
}

is MediaEvent -> {
mkaRecorder.addFrame(
event.media.tag,
event.media.timestamp,
Base64.decode(event.media.payload)
)
}
}
}

override fun stop() {
logger.info("Stopping.")
mkaRecorder.close()
}
}

class EventQueue(packetHandler: (Event) -> Boolean) : PacketQueue<Event>(
100,
false,
"id",
packetHandler,
TaskPools.IO_POOL,
Clock.systemUTC()
)
20 changes: 13 additions & 7 deletions src/main/kotlin/org/jitsi/recorder/MkaRecorder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import org.ebml.matroska.MatroskaFileTagEntry
import org.ebml.matroska.MatroskaFileTrack
import org.ebml.matroska.MatroskaFileTrack.TrackType
import org.ebml.matroska.MatroskaFileWriter
import org.jitsi.utils.logging2.createLogger
import org.jitsi.utils.logging2.Logger
import org.jitsi.utils.logging2.LoggerImpl
import java.io.File
import java.nio.ByteBuffer

class MkaRecorder(directory: File) {
private val logger = createLogger()
class MkaRecorder(directory: File, parentLogger: Logger = LoggerImpl("MkaRecorder")) {
private val logger: Logger = parentLogger.createChildLogger(this.javaClass.name)
private val destination: File = File(directory, "recording.mka").apply {
logger.info("Writing to $this")
}
Expand All @@ -50,7 +51,7 @@ class MkaRecorder(directory: File) {
defaultDuration = 20_000_000
isFlagLacing = false
audio = MatroskaFileTrack.MatroskaAudioTrack().apply {
channels = 1
channels = 2
samplingFrequency = 48000F
outputSamplingFrequency = 48000F
}
Expand All @@ -74,16 +75,21 @@ class MkaRecorder(directory: File) {
}
}

fun addFrame(trackName: String, timestampRtp: Long, payload: ByteArray) {
fun setTrackChannels(trackName: String, numChannels: Int) {
val track = tracks[trackName] ?: throw Exception("Track not started")
track.audio.channels = numChannels.toShort()
}

fun addFrame(trackName: String, timestampMs: Long, payload: ByteArray) {
val track = tracks[trackName] ?: throw Exception("Track not started")
val frame = MatroskaFileFrame()
frame.data = ByteBuffer.wrap(payload)
frame.trackNo = track.trackNo
if (initialTimestampMs == -1L) {
frame.timecode = 0
initialTimestampMs = timestampRtp / 48
initialTimestampMs = timestampMs
} else {
frame.timecode = (timestampRtp / 48) - initialTimestampMs
frame.timecode = timestampMs - initialTimestampMs
}
logger.debug("Adding frame to track ${track.trackNo} at timecode ${frame.timecode}")
writer.addFrame(frame)
Expand Down
14 changes: 9 additions & 5 deletions src/main/kotlin/org/jitsi/recorder/TaskPools.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

object TaskPools {
/**
* A global executor service which can be used for non-CPU-intensive tasks.
*/
val IO_POOL: ExecutorService =
Executors.newCachedThreadPool(CustomizableThreadFactory("Global IO pool", false))
private val defaultIoPool: ExecutorService =
Executors.newCachedThreadPool(CustomizableThreadFactory("Global IO Pool", false))

@JvmStatic
var ioPool: ExecutorService = defaultIoPool

fun resetIoPool() {
ioPool = defaultIoPool
}
}
Loading

0 comments on commit 60b15c5

Please sign in to comment.