Skip to content

Commit

Permalink
Track recordings (#180)
Browse files Browse the repository at this point in the history
  • Loading branch information
sgfn authored Jan 13, 2025
1 parent d74a937 commit e0ff731
Show file tree
Hide file tree
Showing 4 changed files with 381 additions and 3 deletions.
2 changes: 1 addition & 1 deletion lib/ex_webrtc/media/opus.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule ExWebRTC.Media.Opus do
Returns the duration in milliseconds.
"""
@spec duration(binary()) :: {:ok, float()} | {:error, term()}
@spec duration(binary()) :: {:ok, number()} | {:error, term()}
def duration(<<config::5, rest::bitstring>>) do
with {:ok, frame_count} <- get_frame_count(rest) do
{:ok, frame_count * get_frame_duration(config)}
Expand Down
196 changes: 196 additions & 0 deletions lib/ex_webrtc/recorder.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
defmodule ExWebRTC.Recorder do
@moduledoc """
Saves received RTP packets to a file for later processing/analysis.
Dumps raw RTP packets fed to it in a custom format. Use `Recorder.Converter` to process them.
"""

use GenServer

alias ExWebRTC.MediaStreamTrack

require Logger

@default_base_dir "./recordings"

@typedoc """
Options that can be passed to `start_link/1`.
* `base_dir` - Base directory where Recorder will save its artifacts. `#{@default_base_dir}` by default.
* `on_start` - Callback that will be executed just after the Recorder is (re)started.
It should return the initial list of tracks to be added.
"""
@type option ::
{:base_dir, String.t()}
| {:on_start, (-> [MediaStreamTrack.t()])}

@type options :: [option()]

# Necessary to start Recorder under a supervisor using `{Recorder, [recorder_opts, gen_server_opts]}`
@doc false
@spec child_spec(list()) :: Supervisor.child_spec()
def child_spec(args) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, args}
}
end

@doc """
Starts a new `ExWebRTC.Recorder` process.
`ExWebRTC.Recorder` is a `GenServer` under the hood, thus this function allows for
passing the generic `t:GenServer.options/0` as an argument.
"""
@spec start(options(), GenServer.options()) :: GenServer.on_start()
def start(recorder_opts \\ [], gen_server_opts \\ []) do
GenServer.start(__MODULE__, recorder_opts, gen_server_opts)
end

@doc """
Starts a new `ExWebRTC.Recorder` process.
Works identically to `start/2`, but links to the calling process.
"""
@spec start_link(options(), GenServer.options()) :: GenServer.on_start()
def start_link(recorder_opts \\ [], gen_server_opts \\ []) do
GenServer.start_link(__MODULE__, recorder_opts, gen_server_opts)
end

@doc """
Adds new tracks to the recording.
"""
@spec add_tracks(GenServer.server(), [MediaStreamTrack.t()]) :: :ok
def add_tracks(recorder, tracks) do
GenServer.call(recorder, {:add_tracks, tracks})
end

@doc """
Records a received packet on the given track.
"""
@spec record(
GenServer.server(),
MediaStreamTrack.id(),
MediaStreamTrack.rid() | nil,
ExRTP.Packet.t()
) :: :ok
def record(recorder, track_id, rid, %ExRTP.Packet{} = packet) do
recv_time = System.monotonic_time(:millisecond)
GenServer.cast(recorder, {:record, track_id, rid, recv_time, packet})
end

@impl true
def init(config) do
base_dir =
(config[:base_dir] || @default_base_dir)
|> Path.join(current_datetime())
|> Path.expand()

:ok = File.mkdir_p!(base_dir)
Logger.info("Starting recorder. Recordings will be saved under: #{base_dir}")

state = %{
base_dir: base_dir,
tracks: %{}
}

case config[:on_start] do
nil ->
{:ok, state}

callback ->
{:ok, state, {:continue, {:on_start, callback}}}
end
end

@impl true
def handle_continue({:on_start, on_start}, state) do
case on_start.() do
[] ->
{:noreply, state}

tracks ->
state = do_add_tracks(tracks, state)
{:noreply, state}
end
end

@impl true
def handle_call({:add_tracks, tracks}, _from, state) do
state = do_add_tracks(tracks, state)
{:reply, :ok, state}
end

@impl true
def handle_cast({:record, track_id, rid, recv_time, packet}, state)
when is_map_key(state.tracks, track_id) do
%{file: file, rid_map: rid_map} = state.tracks[track_id]

case rid_map do
%{^rid => rid_idx} ->
:ok = IO.binwrite(file, serialize_packet(packet, rid_idx, recv_time))

_other ->
Logger.warning("""
Tried to save packet for unknown rid. Ignoring. Track id: #{inspect(track_id)}, rid: #{inspect(rid)}.\
""")
end

{:noreply, state}
end

@impl true
def handle_cast({:record, track_id, _rid, _recv_time, _packet}, state) do
Logger.warning("""
Tried to save packet for unknown track id. Ignoring. Track id: #{inspect(track_id)}.\
""")

{:noreply, state}
end

@impl true
def handle_info(_msg, state) do
{:noreply, state}
end

defp do_add_tracks(tracks, state) do
start_time = DateTime.utc_now()

tracks =
Map.new(tracks, fn track ->
path = Path.join(state.base_dir, "#{track.id}.rtpx")
file = File.open!(path, [:write])
rid_map = (track.rids || [nil]) |> Enum.with_index() |> Map.new()

{track.id,
%{kind: track.kind, rid_map: rid_map, path: path, file: file, start_time: start_time}}
end)

state = %{state | tracks: Map.merge(state.tracks, tracks)}
report_path = Path.join(state.base_dir, "report.json")

report =
Map.new(state.tracks, fn {id, track} ->
track = Map.delete(track, :file)
{id, track}
end)

:ok = File.write!(report_path, Jason.encode!(report))

%{state | tracks: tracks}
end

defp serialize_packet(packet, rid_idx, recv_time) do
packet = ExRTP.Packet.encode(packet)
packet_size = byte_size(packet)
<<rid_idx::8, recv_time::64, packet_size::32, packet::binary>>
end

defp current_datetime() do
{{y, mo, d}, {h, m, s}} = :calendar.local_time()

# e.g. 20240130-120315
:io_lib.format("~4..0w~2..0w~2..0w-~2..0w~2..0w~2..0w", [y, mo, d, h, m, s])
|> to_string()
end
end
182 changes: 182 additions & 0 deletions lib/ex_webrtc/recorder/converter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
defmodule ExWebRTC.Recorder.Converter do
@moduledoc """
Processes RTP packet files saved by `Recorder`.
At the moment, `Converter` works only with VP8 video and Opus audio.
"""

require Logger

alias ExWebRTC.RTP.JitterBuffer.PacketStore
alias ExWebRTC.RTPCodecParameters
alias ExWebRTC.RTP.Depayloader
alias ExWebRTC.Media.{IVF, Ogg}

# TODO: Allow changing these values
@ivf_header_opts [
# <<fourcc::little-32>> = "VP80"
fourcc: 808_996_950,
height: 720,
width: 1280,
num_frames: 1024,
timebase_denum: 24,
timebase_num: 1
]

# TODO: Support codecs other than VP8/Opus
@video_codec_params %RTPCodecParameters{
payload_type: 96,
mime_type: "video/VP8",
clock_rate: 90_000
}

@audio_codec_params %RTPCodecParameters{
payload_type: 111,
mime_type: "audio/opus",
clock_rate: 48_000,
channels: 2
}

@default_output_path "./converter_output"

@doc """
Convert the saved dumps of tracks in the report to IVF and Ogg files.
"""
@spec convert!(Path.t(), Path.t()) :: :ok | no_return()
def convert!(report_path, output_path \\ @default_output_path) do
report_path =
report_path
|> Path.expand()
|> then(
&if(File.dir?(&1),
do: Path.join(&1, "report.json"),
else: &1
)
)

output_path = Path.expand(output_path)
File.mkdir_p!(output_path)

report =
report_path
|> File.read!()
|> Jason.decode!()

for {id, track} <- report do
%{
"path" => path,
"kind" => kind,
"rid_map" => rid_map
} = track

file = File.open!(path)

packets =
read_packets(file, Map.new(rid_map, fn {_rid, rid_idx} -> {rid_idx, %PacketStore{}} end))

case kind do
"video" ->
convert_video_track(id, rid_map, output_path, packets)

"audio" ->
convert_audio_track(id, output_path, packets |> Map.values() |> hd())
end
end

:ok
end

defp convert_video_track(id, rid_map, output_path, packets) do
for {rid, rid_idx} <- rid_map do
filename = if rid == "nil", do: "#{id}.ivf", else: "#{id}_#{rid}.ivf"

{:ok, writer} =
output_path
|> Path.join(filename)
|> IVF.Writer.open(@ivf_header_opts)

{:ok, depayloader} = Depayloader.new(@video_codec_params)
do_convert_video_track(packets[rid_idx], depayloader, writer)
end
end

defp do_convert_video_track(packets, depayloader, writer, frames_cnt \\ 0)
defp do_convert_video_track([], _depayloader, writer, _frames_cnt), do: IVF.Writer.close(writer)

defp do_convert_video_track([packet | rest], depayloader, writer, frames_cnt) do
case Depayloader.depayload(depayloader, packet) do
{nil, depayloader} ->
do_convert_video_track(rest, depayloader, writer, frames_cnt)

{vp8_frame, depayloader} ->
frame = %IVF.Frame{timestamp: frames_cnt, data: vp8_frame}
{:ok, writer} = IVF.Writer.write_frame(writer, frame)
do_convert_video_track(rest, depayloader, writer, frames_cnt + 1)
end
end

defp convert_audio_track(id, output_path, packets) do
{:ok, writer} =
output_path
|> Path.join("#{id}.ogg")
|> Ogg.Writer.open()

{:ok, depayloader} = Depayloader.new(@audio_codec_params)
do_convert_audio_track(packets, depayloader, writer)
end

defp do_convert_audio_track([], _depayloader, writer), do: Ogg.Writer.close(writer)

defp do_convert_audio_track([packet | rest], depayloader, writer) do
{opus_packet, depayloader} = Depayloader.depayload(depayloader, packet)
{:ok, writer} = Ogg.Writer.write_packet(writer, opus_packet)
do_convert_audio_track(rest, depayloader, writer)
end

defp read_packets(file, stores) do
case read_packet(file) do
{:ok, rid_idx, packet} ->
stores = Map.update!(stores, rid_idx, &insert_packet_to_store(&1, packet))
read_packets(file, stores)

{:error, reason} ->
Logger.warning("Error decoding RTP packet: #{inspect(reason)}")
read_packets(file, stores)

:eof ->
Map.new(stores, fn {rid_idx, store} ->
{rid_idx, store |> PacketStore.dump() |> Enum.reject(&is_nil/1)}
end)
end
end

defp read_packet(file) do
with {:ok, <<rid_idx::8, _recv_time::64, packet_size::32>>} <- read_exactly_n_bytes(file, 13),
{:ok, packet_data} <- read_exactly_n_bytes(file, packet_size),
{:ok, packet} <- ExRTP.Packet.decode(packet_data) do
{:ok, rid_idx, packet}
end
end

defp read_exactly_n_bytes(file, byte_cnt) do
with data when is_binary(data) <- IO.binread(file, byte_cnt),
true <- byte_cnt == byte_size(data) do
{:ok, data}
else
:eof -> :eof
false -> {:error, :not_enough_data}
{:error, _reason} = error -> error
end
end

defp insert_packet_to_store(store, packet) do
case PacketStore.insert(store, packet) do
{:ok, store} ->
store

{:error, :late_packet} ->
Logger.warning("Decoded late RTP packet")
store
end
end
end
Loading

0 comments on commit e0ff731

Please sign in to comment.