From e0ff731c7000e9f51aaf35cf05343df8d1055047 Mon Sep 17 00:00:00 2001 From: Jakub Pisarek <99591440+sgfn@users.noreply.github.com> Date: Mon, 13 Jan 2025 11:32:25 +0100 Subject: [PATCH] Track recordings (#180) --- lib/ex_webrtc/media/opus.ex | 2 +- lib/ex_webrtc/recorder.ex | 196 ++++++++++++++++++++++++++++ lib/ex_webrtc/recorder/converter.ex | 182 ++++++++++++++++++++++++++ mix.lock | 4 +- 4 files changed, 381 insertions(+), 3 deletions(-) create mode 100644 lib/ex_webrtc/recorder.ex create mode 100644 lib/ex_webrtc/recorder/converter.ex diff --git a/lib/ex_webrtc/media/opus.ex b/lib/ex_webrtc/media/opus.ex index ffe509f4..5fc681b6 100644 --- a/lib/ex_webrtc/media/opus.ex +++ b/lib/ex_webrtc/media/opus.ex @@ -7,7 +7,7 @@ defmodule ExWebRTC.Media.Opus do Returns the duration in milliseconds. """ - @spec duration(binary()) :: {:ok, float()} | {:error, term()} + @spec duration(binary()) :: {:ok, number()} | {:error, term()} def duration(<>) do with {:ok, frame_count} <- get_frame_count(rest) do {:ok, frame_count * get_frame_duration(config)} diff --git a/lib/ex_webrtc/recorder.ex b/lib/ex_webrtc/recorder.ex new file mode 100644 index 00000000..6775faba --- /dev/null +++ b/lib/ex_webrtc/recorder.ex @@ -0,0 +1,196 @@ +defmodule ExWebRTC.Recorder do + @moduledoc """ + Saves received RTP packets to a file for later processing/analysis. + + Dumps raw RTP packets fed to it in a custom format. Use `Recorder.Converter` to process them. + """ + + use GenServer + + alias ExWebRTC.MediaStreamTrack + + require Logger + + @default_base_dir "./recordings" + + @typedoc """ + Options that can be passed to `start_link/1`. + + * `base_dir` - Base directory where Recorder will save its artifacts. `#{@default_base_dir}` by default. + * `on_start` - Callback that will be executed just after the Recorder is (re)started. + It should return the initial list of tracks to be added. + """ + @type option :: + {:base_dir, String.t()} + | {:on_start, (-> [MediaStreamTrack.t()])} + + @type options :: [option()] + + # Necessary to start Recorder under a supervisor using `{Recorder, [recorder_opts, gen_server_opts]}` + @doc false + @spec child_spec(list()) :: Supervisor.child_spec() + def child_spec(args) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, args} + } + end + + @doc """ + Starts a new `ExWebRTC.Recorder` process. + + `ExWebRTC.Recorder` is a `GenServer` under the hood, thus this function allows for + passing the generic `t:GenServer.options/0` as an argument. + """ + @spec start(options(), GenServer.options()) :: GenServer.on_start() + def start(recorder_opts \\ [], gen_server_opts \\ []) do + GenServer.start(__MODULE__, recorder_opts, gen_server_opts) + end + + @doc """ + Starts a new `ExWebRTC.Recorder` process. + + Works identically to `start/2`, but links to the calling process. + """ + @spec start_link(options(), GenServer.options()) :: GenServer.on_start() + def start_link(recorder_opts \\ [], gen_server_opts \\ []) do + GenServer.start_link(__MODULE__, recorder_opts, gen_server_opts) + end + + @doc """ + Adds new tracks to the recording. + """ + @spec add_tracks(GenServer.server(), [MediaStreamTrack.t()]) :: :ok + def add_tracks(recorder, tracks) do + GenServer.call(recorder, {:add_tracks, tracks}) + end + + @doc """ + Records a received packet on the given track. + """ + @spec record( + GenServer.server(), + MediaStreamTrack.id(), + MediaStreamTrack.rid() | nil, + ExRTP.Packet.t() + ) :: :ok + def record(recorder, track_id, rid, %ExRTP.Packet{} = packet) do + recv_time = System.monotonic_time(:millisecond) + GenServer.cast(recorder, {:record, track_id, rid, recv_time, packet}) + end + + @impl true + def init(config) do + base_dir = + (config[:base_dir] || @default_base_dir) + |> Path.join(current_datetime()) + |> Path.expand() + + :ok = File.mkdir_p!(base_dir) + Logger.info("Starting recorder. Recordings will be saved under: #{base_dir}") + + state = %{ + base_dir: base_dir, + tracks: %{} + } + + case config[:on_start] do + nil -> + {:ok, state} + + callback -> + {:ok, state, {:continue, {:on_start, callback}}} + end + end + + @impl true + def handle_continue({:on_start, on_start}, state) do + case on_start.() do + [] -> + {:noreply, state} + + tracks -> + state = do_add_tracks(tracks, state) + {:noreply, state} + end + end + + @impl true + def handle_call({:add_tracks, tracks}, _from, state) do + state = do_add_tracks(tracks, state) + {:reply, :ok, state} + end + + @impl true + def handle_cast({:record, track_id, rid, recv_time, packet}, state) + when is_map_key(state.tracks, track_id) do + %{file: file, rid_map: rid_map} = state.tracks[track_id] + + case rid_map do + %{^rid => rid_idx} -> + :ok = IO.binwrite(file, serialize_packet(packet, rid_idx, recv_time)) + + _other -> + Logger.warning(""" + Tried to save packet for unknown rid. Ignoring. Track id: #{inspect(track_id)}, rid: #{inspect(rid)}.\ + """) + end + + {:noreply, state} + end + + @impl true + def handle_cast({:record, track_id, _rid, _recv_time, _packet}, state) do + Logger.warning(""" + Tried to save packet for unknown track id. Ignoring. Track id: #{inspect(track_id)}.\ + """) + + {:noreply, state} + end + + @impl true + def handle_info(_msg, state) do + {:noreply, state} + end + + defp do_add_tracks(tracks, state) do + start_time = DateTime.utc_now() + + tracks = + Map.new(tracks, fn track -> + path = Path.join(state.base_dir, "#{track.id}.rtpx") + file = File.open!(path, [:write]) + rid_map = (track.rids || [nil]) |> Enum.with_index() |> Map.new() + + {track.id, + %{kind: track.kind, rid_map: rid_map, path: path, file: file, start_time: start_time}} + end) + + state = %{state | tracks: Map.merge(state.tracks, tracks)} + report_path = Path.join(state.base_dir, "report.json") + + report = + Map.new(state.tracks, fn {id, track} -> + track = Map.delete(track, :file) + {id, track} + end) + + :ok = File.write!(report_path, Jason.encode!(report)) + + %{state | tracks: tracks} + end + + defp serialize_packet(packet, rid_idx, recv_time) do + packet = ExRTP.Packet.encode(packet) + packet_size = byte_size(packet) + <> + end + + defp current_datetime() do + {{y, mo, d}, {h, m, s}} = :calendar.local_time() + + # e.g. 20240130-120315 + :io_lib.format("~4..0w~2..0w~2..0w-~2..0w~2..0w~2..0w", [y, mo, d, h, m, s]) + |> to_string() + end +end diff --git a/lib/ex_webrtc/recorder/converter.ex b/lib/ex_webrtc/recorder/converter.ex new file mode 100644 index 00000000..8a404834 --- /dev/null +++ b/lib/ex_webrtc/recorder/converter.ex @@ -0,0 +1,182 @@ +defmodule ExWebRTC.Recorder.Converter do + @moduledoc """ + Processes RTP packet files saved by `Recorder`. + + At the moment, `Converter` works only with VP8 video and Opus audio. + """ + + require Logger + + alias ExWebRTC.RTP.JitterBuffer.PacketStore + alias ExWebRTC.RTPCodecParameters + alias ExWebRTC.RTP.Depayloader + alias ExWebRTC.Media.{IVF, Ogg} + + # TODO: Allow changing these values + @ivf_header_opts [ + # <> = "VP80" + fourcc: 808_996_950, + height: 720, + width: 1280, + num_frames: 1024, + timebase_denum: 24, + timebase_num: 1 + ] + + # TODO: Support codecs other than VP8/Opus + @video_codec_params %RTPCodecParameters{ + payload_type: 96, + mime_type: "video/VP8", + clock_rate: 90_000 + } + + @audio_codec_params %RTPCodecParameters{ + payload_type: 111, + mime_type: "audio/opus", + clock_rate: 48_000, + channels: 2 + } + + @default_output_path "./converter_output" + + @doc """ + Convert the saved dumps of tracks in the report to IVF and Ogg files. + """ + @spec convert!(Path.t(), Path.t()) :: :ok | no_return() + def convert!(report_path, output_path \\ @default_output_path) do + report_path = + report_path + |> Path.expand() + |> then( + &if(File.dir?(&1), + do: Path.join(&1, "report.json"), + else: &1 + ) + ) + + output_path = Path.expand(output_path) + File.mkdir_p!(output_path) + + report = + report_path + |> File.read!() + |> Jason.decode!() + + for {id, track} <- report do + %{ + "path" => path, + "kind" => kind, + "rid_map" => rid_map + } = track + + file = File.open!(path) + + packets = + read_packets(file, Map.new(rid_map, fn {_rid, rid_idx} -> {rid_idx, %PacketStore{}} end)) + + case kind do + "video" -> + convert_video_track(id, rid_map, output_path, packets) + + "audio" -> + convert_audio_track(id, output_path, packets |> Map.values() |> hd()) + end + end + + :ok + end + + defp convert_video_track(id, rid_map, output_path, packets) do + for {rid, rid_idx} <- rid_map do + filename = if rid == "nil", do: "#{id}.ivf", else: "#{id}_#{rid}.ivf" + + {:ok, writer} = + output_path + |> Path.join(filename) + |> IVF.Writer.open(@ivf_header_opts) + + {:ok, depayloader} = Depayloader.new(@video_codec_params) + do_convert_video_track(packets[rid_idx], depayloader, writer) + end + end + + defp do_convert_video_track(packets, depayloader, writer, frames_cnt \\ 0) + defp do_convert_video_track([], _depayloader, writer, _frames_cnt), do: IVF.Writer.close(writer) + + defp do_convert_video_track([packet | rest], depayloader, writer, frames_cnt) do + case Depayloader.depayload(depayloader, packet) do + {nil, depayloader} -> + do_convert_video_track(rest, depayloader, writer, frames_cnt) + + {vp8_frame, depayloader} -> + frame = %IVF.Frame{timestamp: frames_cnt, data: vp8_frame} + {:ok, writer} = IVF.Writer.write_frame(writer, frame) + do_convert_video_track(rest, depayloader, writer, frames_cnt + 1) + end + end + + defp convert_audio_track(id, output_path, packets) do + {:ok, writer} = + output_path + |> Path.join("#{id}.ogg") + |> Ogg.Writer.open() + + {:ok, depayloader} = Depayloader.new(@audio_codec_params) + do_convert_audio_track(packets, depayloader, writer) + end + + defp do_convert_audio_track([], _depayloader, writer), do: Ogg.Writer.close(writer) + + defp do_convert_audio_track([packet | rest], depayloader, writer) do + {opus_packet, depayloader} = Depayloader.depayload(depayloader, packet) + {:ok, writer} = Ogg.Writer.write_packet(writer, opus_packet) + do_convert_audio_track(rest, depayloader, writer) + end + + defp read_packets(file, stores) do + case read_packet(file) do + {:ok, rid_idx, packet} -> + stores = Map.update!(stores, rid_idx, &insert_packet_to_store(&1, packet)) + read_packets(file, stores) + + {:error, reason} -> + Logger.warning("Error decoding RTP packet: #{inspect(reason)}") + read_packets(file, stores) + + :eof -> + Map.new(stores, fn {rid_idx, store} -> + {rid_idx, store |> PacketStore.dump() |> Enum.reject(&is_nil/1)} + end) + end + end + + defp read_packet(file) do + with {:ok, <>} <- read_exactly_n_bytes(file, 13), + {:ok, packet_data} <- read_exactly_n_bytes(file, packet_size), + {:ok, packet} <- ExRTP.Packet.decode(packet_data) do + {:ok, rid_idx, packet} + end + end + + defp read_exactly_n_bytes(file, byte_cnt) do + with data when is_binary(data) <- IO.binread(file, byte_cnt), + true <- byte_cnt == byte_size(data) do + {:ok, data} + else + :eof -> :eof + false -> {:error, :not_enough_data} + {:error, _reason} = error -> error + end + end + + defp insert_packet_to_store(store, packet) do + case PacketStore.insert(store, packet) do + {:ok, store} -> + store + + {:error, :late_packet} -> + Logger.warning("Decoded late RTP packet") + store + end + end +end diff --git a/mix.lock b/mix.lock index 2f8a248e..23b0a9b3 100644 --- a/mix.lock +++ b/mix.lock @@ -12,7 +12,7 @@ "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, "ex_doc": {:hex, :ex_doc, "0.31.2", "8b06d0a5ac69e1a54df35519c951f1f44a7b7ca9a5bb7a260cd8a174d6322ece", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "317346c14febaba9ca40fd97b5b5919f7751fb85d399cc8e7e8872049f37e0af"}, "ex_dtls": {:hex, :ex_dtls, "0.16.0", "3ae38025ccc77f6db573e2e391602fa9bbc02253c137d8d2d59469a66cbe806b", [:mix], [{:bundlex, "~> 1.5.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2a4e30d74c6ddf95cc5b796423293c06a0da295454c3823819808ff031b4b361"}, - "ex_ice": {:hex, :ex_ice, "0.9.0", "d1a7e31b9cc52faf668f001f870344d3f9094955bafb6af62d84b7b4c2dd6b36", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.2.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "8f256faeb9cc5409d2177e68918198c7ef64372a729c8e1590699546554419aa"}, + "ex_ice": {:hex, :ex_ice, "0.9.2", "7f5513416a8fe33b36d988dd30d6bb79ddd7cfa408e09e2e3d3e3a97e075614d", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.2.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "70f688582dbe36a82cf8bbedf5adb2f0b89996620e229213bd7ff9a9b642e571"}, "ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"}, "ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"}, "ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"}, @@ -23,7 +23,7 @@ "excoveralls": {:hex, :excoveralls, "0.18.3", "bca47a24d69a3179951f51f1db6d3ed63bca9017f476fe520eb78602d45f7756", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "746f404fcd09d5029f1b211739afb8fb8575d775b21f6a3908e7ce3e640724c6"}, "file_system": {:hex, :file_system, "1.0.1", "79e8ceaddb0416f8b8cd02a0127bdbababe7bf4a23d2a395b983c1f8b3f73edd", [:mix], [], "hexpm", "4414d1f38863ddf9120720cd976fce5bdde8e91d8283353f0e31850fa89feb9e"}, "finch": {:hex, :finch, "0.19.0", "c644641491ea854fc5c1bbaef36bfc764e3f08e7185e1f084e35e0672241b76d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc5324ce209125d1e2fa0fcd2634601c52a787aff1cd33ee833664a5af4ea2b6"}, - "hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"}, + "hpax": {:hex, :hpax, "1.0.1", "c857057f89e8bd71d97d9042e009df2a42705d6d690d54eca84c8b29af0787b0", [:mix], [], "hexpm", "4e2d5a4f76ae1e3048f35ae7adb1641c36265510a2d4638157fbcb53dda38445"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"},