Skip to content

Commit

Permalink
Add PeerConneciton.set_packet_loss
Browse files Browse the repository at this point in the history
  • Loading branch information
mickel8 committed Jan 6, 2025
1 parent 2ef1582 commit 04be4ae
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 10 deletions.
41 changes: 32 additions & 9 deletions lib/ex_webrtc/dtls_transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ defmodule ExWebRTC.DTLSTransport do
GenServer.cast(dtls_transport, {:send_data, data})
end

@spec set_packet_loss(dtls_transport(), 0..100) :: :ok
def set_packet_loss(dtls_transport, packet_loss) do
GenServer.cast(dtls_transport, {:set_packet_loss, packet_loss})
end

@spec stop(dtls_transport()) :: :ok
def stop(dtls_transport) do
GenServer.stop(dtls_transport)
Expand Down Expand Up @@ -127,7 +132,8 @@ defmodule ExWebRTC.DTLSTransport do
peer_fingerprint: nil,
dtls_state: :new,
dtls: nil,
mode: nil
mode: nil,
packet_loss: 0
}

notify(state.owner, {:state_change, :new})
Expand All @@ -142,7 +148,7 @@ defmodule ExWebRTC.DTLSTransport do
if state.mode == :active do
{packets, timeout} = ExDTLS.do_handshake(state.dtls)
Process.send_after(self(), :dtls_timeout, timeout)
:ok = state.ice_transport.send_data(state.ice_pid, packets)
:ok = do_send(state, packets)
state = update_dtls_state(state, :connecting)
Logger.debug("Started DTLS handshake")
{:reply, :ok, state}
Expand All @@ -157,7 +163,7 @@ defmodule ExWebRTC.DTLSTransport do

if state.buffered_local_packets do
Logger.debug("Sending buffered DTLS packets")
:ok = state.ice_transport.send_data(state.ice_pid, state.buffered_local_packets)
:ok = do_send(state, state.buffered_local_packets)
state = %{state | buffered_local_packets: nil}
{:reply, :ok, state}
else
Expand Down Expand Up @@ -236,7 +242,7 @@ defmodule ExWebRTC.DTLSTransport do
@impl true
def handle_cast({:send_rtp, data}, %{dtls_state: :connected, ice_connected: true} = state) do
case ExLibSRTP.protect(state.out_srtp, data) do
{:ok, protected} -> state.ice_transport.send_data(state.ice_pid, protected)
{:ok, protected} -> do_send(state, protected)
{:error, reason} -> Logger.warning("Unable to protect RTP: #{inspect(reason)}")
end

Expand All @@ -252,7 +258,7 @@ defmodule ExWebRTC.DTLSTransport do
@impl true
def handle_cast({:send_rtcp, data}, state) do
case ExLibSRTP.protect_rtcp(state.out_srtp, data) do
{:ok, protected} -> state.ice_transport.send_data(state.ice_pid, protected)
{:ok, protected} -> do_send(state, protected)
{:error, reason} -> Logger.warning("Unable to protect RTCP: #{inspect(reason)}")
end

Expand All @@ -262,18 +268,24 @@ defmodule ExWebRTC.DTLSTransport do
@impl true
def handle_cast({:send_data, data}, state) do
case ExDTLS.write_data(state.dtls, data) do
{:ok, protected} -> state.ice_transport.send_data(state.ice_pid, protected)
{:ok, protected} -> do_send(state, protected)
{:error, reason} -> Logger.warning("Unable to protect data: #{inspect(reason)}")
end

{:noreply, state}
end

@impl true
def handle_cast({:set_packet_loss, value}, state) do
state = %{state | packet_loss: value}
{:noreply, state}
end

@impl true
def handle_info(:dtls_timeout, %{buffered_local_packets: buffered_local_packets} = state) do
case ExDTLS.handle_timeout(state.dtls) do
{:retransmit, packets, timeout} when state.ice_connected ->
state.ice_transport.send_data(state.ice_pid, packets)
do_send(state, packets)
Logger.debug("Retransmitted DTLS packets")
Process.send_after(self(), :dtls_timeout, timeout)

Expand Down Expand Up @@ -327,7 +339,7 @@ defmodule ExWebRTC.DTLSTransport do
defp handle_ice_data({:data, <<f, _rest::binary>> = data}, state) when f in 20..63 do
case ExDTLS.handle_data(state.dtls, data) do
{:handshake_packets, packets, timeout} when state.ice_connected ->
:ok = state.ice_transport.send_data(state.ice_pid, packets)
:ok = do_send(state, packets)
Process.send_after(self(), :dtls_timeout, timeout)
state = update_dtls_state(state, :connecting)
{:ok, state}
Expand All @@ -346,7 +358,7 @@ defmodule ExWebRTC.DTLSTransport do
{:handshake_finished, lkm, rkm, profile, packets} ->
Logger.debug("DTLS handshake finished")
state = update_remote_cert_info(state)
state.ice_transport.send_data(state.ice_pid, packets)
do_send(state, packets)

peer_fingerprint =
state.dtls
Expand Down Expand Up @@ -466,5 +478,16 @@ defmodule ExWebRTC.DTLSTransport do
%{state | buffered_remote_rtp_packets: []}
end

defp do_send(%{packet_loss: 0} = state, data),
do: state.ice_transport.send_data(state.ice_pid, data)

defp do_send(state, data) do
if Enum.random(1..100) > state.packet_loss do
state.ice_transport.send_data(state.ice_pid, data)
else
:ok
end
end

defp notify(dst, msg), do: send(dst, {:dtls_transport, self(), msg})
end
16 changes: 16 additions & 0 deletions lib/ex_webrtc/peer_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,16 @@ defmodule ExWebRTC.PeerConnection do
GenServer.cast(peer_connection, {:send_data, channel_ref, data})
end

@doc """
Sets very simple packet loss.
Can be used for experimental purposes.
"""
@spec set_packet_loss(peer_connection(), 0..100) :: :ok
def set_packet_loss(peer_connection, value) when value in 0..100 do
GenServer.cast(peer_connection, {:set_packet_loss, value})
end

#### MDN-API ####

@doc """
Expand Down Expand Up @@ -1162,6 +1172,12 @@ defmodule ExWebRTC.PeerConnection do
{:noreply, %{state | sctp_transport: sctp_transport}}
end

@impl true
def handle_cast({:set_packet_loss, packet_loss}, state) do
DTLSTransport.set_packet_loss(state.dtls_transport, packet_loss)
{:noreply, state}
end

@impl true
def handle_info({:ex_ice, _from, {:connection_state_change, new_ice_state}}, state) do
state = %{state | ice_state: new_ice_state}
Expand Down
59 changes: 58 additions & 1 deletion test/ex_webrtc/dtls_transport_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ defmodule ExWebRTC.DTLSTransportTest do
|> ExDTLS.get_cert_fingerprint()
|> Utils.hex_dump()

@rtp_header <<1::1, 0::1, 0::1, 0::1, 0::4, 0::1, 96::7, 1::16, 1::32, 1::32>>
@rtp_payload <<0>>
@rtp_packet <<@rtp_header::binary, @rtp_payload::binary>>

# empty rr packet
@rtcp_rr_header <<2::2, 0::1, 0::5, 201::8, 1::16, 1::32>>
@rtcp_rr_packet <<@rtcp_rr_header::binary>>

defmodule MockICETransport do
@behaviour ExWebRTC.ICETransport

Expand Down Expand Up @@ -87,7 +95,7 @@ defmodule ExWebRTC.DTLSTransportTest do
end

test "cannot send data when handshake not finished", %{dtls: dtls} do
DTLSTransport.send_rtp(dtls, <<1, 2, 3>>)
DTLSTransport.send_rtp(dtls, @rtp_packet)

refute_receive {:mock_ice, _data}
end
Expand Down Expand Up @@ -175,6 +183,14 @@ defmodule ExWebRTC.DTLSTransportTest do
assert :ok = check_handshake(dtls, ice_transport, ice_pid, remote_dtls)
assert_receive {:dtls_transport, ^dtls, {:state_change, :connecting}}
assert_receive {:dtls_transport, ^dtls, {:state_change, :connected}}

# assert we can send data
assert :ok = DTLSTransport.send_rtp(dtls, @rtp_packet)
assert_receive {:mock_ice, <<@rtp_header::binary, _payload::binary>>}
assert :ok = DTLSTransport.send_rtcp(dtls, @rtcp_rr_packet)
assert_receive {:mock_ice, <<@rtcp_rr_header::binary, _payload::binary>>}
assert :ok = DTLSTransport.send_data(dtls, <<1, 2, 3>>)
assert_receive {:mock_ice, _datachannel_packet}
end

test "finishes handshake in passive mode", %{
Expand All @@ -200,6 +216,47 @@ defmodule ExWebRTC.DTLSTransportTest do
assert :ok == check_handshake(dtls, ice_transport, ice_pid, remote_dtls)
assert_receive {:dtls_transport, ^dtls, {:state_change, :connecting}}
assert_receive {:dtls_transport, ^dtls, {:state_change, :connected}}

# assert we can send rtp packets
assert :ok = DTLSTransport.send_rtp(dtls, @rtp_packet)
assert_receive {:mock_ice, <<@rtp_header::binary, _payload::binary>>}
assert :ok = DTLSTransport.send_rtcp(dtls, @rtcp_rr_packet)
assert_receive {:mock_ice, <<@rtcp_rr_header::binary, _payload::binary>>}
assert :ok = DTLSTransport.send_data(dtls, <<1, 2, 3>>)
assert_receive {:mock_ice, _datachannel_packet}
end

@tag :debug
test "drops packets when packet loss is set", %{
dtls: dtls,
ice_transport: ice_transport,
ice_pid: ice_pid
} do
:ok = DTLSTransport.start_dtls(dtls, :active, @fingerprint)
remote_dtls = ExDTLS.init(mode: :server, dtls_srtp: true)

:ok = DTLSTransport.set_ice_connected(dtls)

assert :ok = check_handshake(dtls, ice_transport, ice_pid, remote_dtls)
assert_receive {:dtls_transport, ^dtls, {:state_change, :connecting}}
assert_receive {:dtls_transport, ^dtls, {:state_change, :connected}}

# assert we can send
DTLSTransport.send_rtp(dtls, @rtp_packet)
assert_receive {:mock_ice, <<@rtp_header::binary, _payload::binary>>}
DTLSTransport.send_rtcp(dtls, @rtcp_rr_packet)
assert_receive {:mock_ice, <<@rtcp_rr_packet::binary, _rest::binary>>}
DTLSTransport.send_data(dtls, <<1, 2, 3>>)
assert_receive {:mock_ice, _datachannel_packet}

# now set packet-loss
DTLSTransport.set_packet_loss(dtls, 100)
DTLSTransport.send_rtp(dtls, @rtp_packet)
refute_receive {:mock_ice, _rtp_packet}
DTLSTransport.send_rtcp(dtls, @rtcp_rr_packet)
refute_receive {:mock_ice, _rtcp_rr_packet}
DTLSTransport.send_data(dtls, <<1, 2, 3>>)
refute_receive {:mock_ice, _datachannel_packet}
end

defp check_handshake(dtls, ice_transport, ice_pid, remote_dtls) do
Expand Down

0 comments on commit 04be4ae

Please sign in to comment.