From 8f3da2cfb1e85f570d81ced1c1dae16e5cdfb6ac Mon Sep 17 00:00:00 2001 From: Jakub Balinski Date: Fri, 14 Jan 2022 12:15:31 +0100 Subject: [PATCH 01/10] make parser for twcc feedbacks, improve encoding twcc feedbacks --- .../rtcp/transport_feedback_packet/twcc.ex | 191 +++++++++++------- 1 file changed, 116 insertions(+), 75 deletions(-) diff --git a/lib/membrane/rtcp/transport_feedback_packet/twcc.ex b/lib/membrane/rtcp/transport_feedback_packet/twcc.ex index 8214f1b3..3b8292cd 100644 --- a/lib/membrane/rtcp/transport_feedback_packet/twcc.ex +++ b/lib/membrane/rtcp/transport_feedback_packet/twcc.ex @@ -1,6 +1,6 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do @moduledoc """ - Serializes [Transport-wide congestion control](https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01) + Encodes and decodes [Transport-wide congestion control](https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01) feedback packets. """ @behaviour Membrane.RTCP.TransportFeedbackPacket @@ -50,24 +50,26 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do @status_vector_symbol_2_bit_id 1 @status_vector_capacity 7 - @packet_status_code %{ - not_received: 0, - small_delta: 1, - large_or_negative_delta: 2, - reserved: 3 - } + @packet_status_flags BiMap.new(%{ + not_received: 0, + small_delta: 1, + large_or_negative_delta: 2, + reserved: 3 + }) @impl true - def decode(binary) do - <> = binary + def decode( + <> + ) do + receive_deltas = parse_feedback(payload, packet_status_count) {:ok, %__MODULE__{ base_seq_num: base_seq_num, - reference_time: reference_time * Time.milliseconds(64), + reference_time: Time.milliseconds(reference_time) * 64, packet_status_count: packet_status_count, - receive_deltas: [], + receive_deltas: receive_deltas, feedback_packet_count: feedback_packet_count }} end @@ -89,11 +91,85 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do # https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01#section-3.1 reference_time = div(reference_time, Time.milliseconds(64)) - payload = - <> <> - encode_packet_status(packet_status_chunks) <> encode_receive_deltas(scaled_receive_deltas) + encoded_header = + <> - maybe_add_padding(payload) + encoded_packet_status_chunks = + Enum.map_join(packet_status_chunks, &encode_packet_status_chunk/1) + + encoded_receive_deltas = Enum.map_join(scaled_receive_deltas, &encode_receive_delta/1) + + [encoded_header, encoded_packet_status_chunks, encoded_receive_deltas] + |> Enum.join() + |> maybe_add_padding() + end + + defp parse_feedback(payload, packet_status_count) do + {receive_deltas, parsed_packet_status} = parse_packet_status(payload, packet_status_count, []) + + parse_receive_deltas(receive_deltas, parsed_packet_status, []) + end + + defp parse_packet_status(binary, packets_left, parsed_status) when packets_left <= 0 do + # note about incomplete vectors: the draft does not specify this, but libwebrtc can make the last + # status vector incomplete, filling the untaken slots with 0s - we may need to drop them + parsed_status = Enum.drop(parsed_status, packets_left) + + {binary, parsed_status} + end + + defp parse_packet_status( + <<@run_length_id::1, packet_status::2, run_length::unsigned-integer-size(13), + rest::binary>>, + packets_left, + parsed_status + ) do + new_status = + @packet_status_flags + |> BiMap.fetch_key!(packet_status) + |> List.duplicate(run_length) + + parse_packet_status(rest, packets_left - run_length, parsed_status ++ new_status) + end + + defp parse_packet_status( + <<@status_vector_id::1, vector_type::1, symbol_list::bits-size(14), rest::binary>>, + packets_left, + parsed_status + ) do + # vector_type = 0 -> 14 symbols, 1 bit each + # vector_type = 1 -> 7 symbols, 2 bits each + symbol_size = vector_type + 1 + + # note about 1-bit symbols: the draft does not specify this, + # but libwebrtc treats <<1::1>> as a "packet received, small delta" status + new_status = + for <<(<> <- symbol_list)>>, + do: BiMap.fetch_key!(@packet_status_flags, symbol) + + parse_packet_status(rest, packets_left - div(14, symbol_size), parsed_status ++ new_status) + end + + defp parse_receive_deltas(_padding, [], parsed_deltas), do: Enum.reverse(parsed_deltas) + + defp parse_receive_deltas( + <>, + [:small_delta | rest_status], + parsed_deltas + ) do + parse_receive_deltas(rest, rest_status, [Time.microseconds(delta) * 250 | parsed_deltas]) + end + + defp parse_receive_deltas( + <>, + [:large_or_negative_delta | rest_status], + parsed_deltas + ) do + parse_receive_deltas(rest, rest_status, [Time.microseconds(delta) * 250 | parsed_deltas]) + end + + defp parse_receive_deltas(binary, [_not_received_or_reserved | rest_status], parsed_deltas) do + parse_receive_deltas(binary, rest_status, [:not_received | parsed_deltas]) end defp make_packet_status_chunks(scaled_receive_deltas) do @@ -124,71 +200,42 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do end) end - defp encode_receive_deltas(scaled_receive_deltas) do - Enum.map_join(scaled_receive_deltas, fn delta -> - case delta_to_packet_status(delta) do - :not_received -> - <<>> + defp encode_receive_delta(scaled_delta) do + case delta_to_packet_status(scaled_delta) do + :not_received -> + Membrane.Logger.warn("Reporting a non-received packet") + <<>> - :small_delta -> - <> + :small_delta -> + <> - :large_or_negative_delta -> - Membrane.Logger.warn( - "Reporting a packet with large or negative delta: (#{inspect(delta / 4)}ms)" - ) + :large_or_negative_delta -> + Membrane.Logger.warn( + "Reporting a packet with large or negative delta: (#{inspect(scaled_delta / 4)}ms)" + ) - <> - end - end) + <> + end end - defp encode_packet_status(packet_status_chunks) do - Enum.map_join(packet_status_chunks, fn chunk -> - case chunk do - %RunLength{} -> - encode_run_length(chunk) - - %StatusVector{packet_count: @status_vector_capacity} -> - encode_status_vector(chunk) - - %StatusVector{} -> - chunk - |> status_vector_to_run_length() - |> Enum.map_join(&encode_run_length/1) - end - end) - end + defp encode_packet_status_chunk(%RunLength{packet_status: status, packet_count: packet_count}), + do: <<@run_length_id::1, BiMap.fetch!(@packet_status_flags, status)::2, packet_count::13>> - defp encode_run_length(%RunLength{packet_status: status, packet_count: count}), - do: <<@run_length_id::1, @packet_status_code[status]::2, count::13>> + defp encode_packet_status_chunk(%StatusVector{vector: vector, packet_count: packet_count}) do + # we use 2-bit symbols, so padding size for an incomplete vector is 2*(number of unfilled slots) bits + padding_size = 2 * (@status_vector_capacity - packet_count) - defp encode_status_vector(%StatusVector{vector: vector, packet_count: @status_vector_capacity}) do symbol_list = Enum.reduce(vector, <<>>, fn status, acc -> - <> + <> end) - <<(<<@status_vector_id::1, @status_vector_symbol_2_bit_id::1>>)::bitstring, - symbol_list::bitstring>> + <<@status_vector_id::1, @status_vector_symbol_2_bit_id::1, symbol_list::bitstring, + 0::size(padding_size)>> end defp run_length_to_status_vector(%RunLength{packet_status: status, packet_count: count}), - do: %StatusVector{vector: Enum.map(1..count, fn _i -> status end), packet_count: count} - - defp status_vector_to_run_length(%StatusVector{vector: vector}) do - vector - |> Enum.reduce([], fn status, acc -> - case acc do - [%RunLength{packet_status: ^status, packet_count: count} | rest] -> - [%RunLength{packet_status: status, packet_count: count + 1} | rest] - - _empty_acc_or_other_status -> - [%RunLength{packet_status: status, packet_count: 1} | acc] - end - end) - |> Enum.reverse() - end + do: %StatusVector{vector: List.duplicate(status, count), packet_count: count} defp scale_delta(:not_received), do: :not_received @@ -215,13 +262,7 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do end defp maybe_add_padding(payload) do - bits_remaining = rem(bit_size(payload), 32) - - if bits_remaining > 0 do - padding_size = 32 - bits_remaining - <> - else - payload - end + padding_size = 32 - rem(bit_size(payload), 32) + <> end end From 57e00ca434c0765a5a3ce43814b4f14d9aaafad9 Mon Sep 17 00:00:00 2001 From: Jakub Balinski Date: Fri, 14 Jan 2022 12:29:09 +0100 Subject: [PATCH 02/10] fix padding --- .../rtcp/transport_feedback_packet/twcc.ex | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/lib/membrane/rtcp/transport_feedback_packet/twcc.ex b/lib/membrane/rtcp/transport_feedback_packet/twcc.ex index 3b8292cd..dde99cec 100644 --- a/lib/membrane/rtcp/transport_feedback_packet/twcc.ex +++ b/lib/membrane/rtcp/transport_feedback_packet/twcc.ex @@ -222,14 +222,14 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do do: <<@run_length_id::1, BiMap.fetch!(@packet_status_flags, status)::2, packet_count::13>> defp encode_packet_status_chunk(%StatusVector{vector: vector, packet_count: packet_count}) do - # we use 2-bit symbols, so padding size for an incomplete vector is 2*(number of unfilled slots) bits - padding_size = 2 * (@status_vector_capacity - packet_count) - symbol_list = Enum.reduce(vector, <<>>, fn status, acc -> <> end) + # we use 2-bit symbols, so padding size for an incomplete vector is 2*(number of unfilled slots) bits + padding_size = 2 * (@status_vector_capacity - packet_count) + <<@status_vector_id::1, @status_vector_symbol_2_bit_id::1, symbol_list::bitstring, 0::size(padding_size)>> end @@ -262,7 +262,13 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do end defp maybe_add_padding(payload) do - padding_size = 32 - rem(bit_size(payload), 32) - <> + bytes_remaining = rem(byte_size(payload), 4) + + if bytes_remaining > 0 do + padding_size = 4 - bytes_remaining + <> + else + payload + end end end From d0d38f16302aaaaca5adfd0ba09298b25c8f3e90 Mon Sep 17 00:00:00 2001 From: Jakub Balinski Date: Fri, 14 Jan 2022 13:11:12 +0100 Subject: [PATCH 03/10] add TWCC sender element --- lib/membrane/rtp/session_bin.ex | 43 ++++++++++++++++---- lib/membrane/rtp/twcc_receiver.ex | 10 ++--- lib/membrane/rtp/twcc_sender.ex | 52 ++++++++++++++++++++++++ test/membrane/rtp/twcc_receiver_test.exs | 6 +-- 4 files changed, 95 insertions(+), 16 deletions(-) create mode 100644 lib/membrane/rtp/twcc_sender.ex diff --git a/lib/membrane/rtp/session_bin.ex b/lib/membrane/rtp/session_bin.ex index fbeebdd0..f2e4733d 100644 --- a/lib/membrane/rtp/session_bin.ex +++ b/lib/membrane/rtp/session_bin.ex @@ -396,12 +396,13 @@ defmodule Membrane.RTP.SessionBin do } } - {rtp_extensions, maybe_link_twcc, state} = maybe_handle_twcc(rtp_extensions, ssrc, ctx, state) + {rtp_extensions, maybe_link_twcc_receiver, state} = + maybe_handle_twcc_receiver(rtp_extensions, ssrc, ctx, state) router_link = link(:ssrc_router) |> via_out(Pad.ref(:output, ssrc)) - |> then(maybe_link_twcc) + |> then(maybe_link_twcc_receiver) |> to(rtp_stream_name) acc = {new_children, router_link} @@ -465,8 +466,11 @@ defmodule Membrane.RTP.SessionBin do maybe_link_encryptor = &to(&1, {:srtp_encryptor, ssrc}, %SRTP.Encryptor{policies: state.srtp_policies}) + maybe_link_twcc_sender = maybe_handle_twcc_sender(ssrc, ctx) + links = [ link_bin_input(input_pad) + |> then(maybe_link_twcc_sender) |> to({:stream_send_bin, ssrc}, %RTP.StreamSendBin{ ssrc: ssrc, payload_type: payload_type, @@ -601,13 +605,13 @@ defmodule Membrane.RTP.SessionBin do raise "Cannot find default RTP payload type for encoding #{encoding}" end - defp maybe_handle_twcc(rtp_extensions, pad_ssrc, ctx, state) do - # workaround: as TWCC is a transport-wide extension, there should exist only one TWCC child - # that handles packets from all incoming streams that have declared support for it + defp maybe_handle_twcc_receiver(rtp_extensions, pad_ssrc, ctx, state) do + # Workaround: as TWCC is a transport-wide extension, there should exist only one TWCC receiver + # child that handles packets from all incoming streams that have declared support for it. {maybe_twcc, rtp_extensions} = Keyword.pop(rtp_extensions, :twcc) should_link? = maybe_twcc != nil - should_create_child? = not Map.has_key?(ctx.children, :twcc) + should_create_child? = not Map.has_key?(ctx.children, :twcc_receiver) {maybe_twcc_ssrc, state} = if should_link? and should_create_child? do @@ -622,9 +626,9 @@ defmodule Membrane.RTP.SessionBin do |> via_in(Pad.ref(:input, pad_ssrc)) |> then(fn link -> if should_create_child? do - to(link, :twcc, %{maybe_twcc | sender_ssrc: maybe_twcc_ssrc}) + to(link, :twcc_receiver, %{maybe_twcc | feedback_sender_ssrc: maybe_twcc_ssrc}) else - to(link, :twcc) + to(link, :twcc_receiver) end end) |> via_out(Pad.ref(:output, pad_ssrc))) @@ -634,4 +638,27 @@ defmodule Membrane.RTP.SessionBin do {rtp_extensions, maybe_link_twcc, state} end + + defp maybe_handle_twcc_sender(pad_ssrc, ctx) do + # Workaround: as TWCC is a transport-wide extension, there should exist only one TWCC sender + # child that handles packets for all outgoing streams. As there is no support for declaring + # outbound extensions, outbound TWCC will be enabled if inbound TWCC has been enabled. + should_link? = Map.has_key?(ctx.children, :twcc_receiver) + should_create_child? = not Map.has_key?(ctx.children, :twcc_sender) + + if should_link? do + &(&1 + |> via_in(Pad.ref(:input, pad_ssrc)) + |> then(fn link -> + if should_create_child? do + to(link, :twcc_sender, RTP.TWCCSender) + else + to(link, :twcc_sender) + end + end) + |> via_out(Pad.ref(:output, pad_ssrc))) + else + & &1 + end + end end diff --git a/lib/membrane/rtp/twcc_receiver.ex b/lib/membrane/rtp/twcc_receiver.ex index 215fd226..341fa063 100644 --- a/lib/membrane/rtp/twcc_receiver.ex +++ b/lib/membrane/rtp/twcc_receiver.ex @@ -25,7 +25,7 @@ defmodule Membrane.RTP.TWCCReceiver do default: Membrane.Time.milliseconds(250), description: "How often to generate feedback packets." ], - sender_ssrc: [ + feedback_sender_ssrc: [ spec: RTP.ssrc_t() | nil, default: nil, description: @@ -38,14 +38,14 @@ defmodule Membrane.RTP.TWCCReceiver do @type t :: %__MODULE__{ twcc_id: 1..14, - sender_ssrc: RTP.ssrc_t() | nil, + feedback_sender_ssrc: RTP.ssrc_t() | nil, report_interval: Time.t(), packet_info_store: PacketInfoStore.t(), feedback_packet_count: non_neg_integer(), media_ssrc: RTP.ssrc_t() | nil } - @enforce_keys [:twcc_id, :report_interval, :sender_ssrc] + @enforce_keys [:twcc_id, :report_interval, :feedback_sender_ssrc] defstruct @enforce_keys ++ [ packet_info_store: %PacketInfoStore{}, @@ -137,7 +137,7 @@ defmodule Membrane.RTP.TWCCReceiver do %State{ packet_info_store: store, feedback_packet_count: feedback_packet_count, - sender_ssrc: sender_ssrc, + feedback_sender_ssrc: feedback_sender_ssrc, media_ssrc: media_ssrc } = state @@ -148,7 +148,7 @@ defmodule Membrane.RTP.TWCCReceiver do %RTCPEvent{ rtcp: %TransportFeedbackPacket{ - sender_ssrc: sender_ssrc, + sender_ssrc: feedback_sender_ssrc, media_ssrc: media_ssrc, payload: struct!(TransportFeedbackPacket.TWCC, stats) } diff --git a/lib/membrane/rtp/twcc_sender.ex b/lib/membrane/rtp/twcc_sender.ex new file mode 100644 index 00000000..c14b30b0 --- /dev/null +++ b/lib/membrane/rtp/twcc_sender.ex @@ -0,0 +1,52 @@ +defmodule Membrane.RTP.TWCCSender do + @moduledoc """ + The module defines an element responsible for tagging outgoing packets with transport-wide sequence numbers. + """ + use Membrane.Filter + + alias Membrane.RTP + alias Membrane.RTP.Header + + require Bitwise + + @seq_number_limit Bitwise.bsl(1, 16) + + def_input_pad :input, demand_unit: :buffers, caps: RTP, availability: :on_request + def_output_pad :output, caps: RTP, availability: :on_request + + @impl true + def handle_init(_options) do + {:ok, %{seq_num: 0}} + end + + @impl true + def handle_caps(Pad.ref(:input, id), caps, _ctx, state) do + {{:ok, caps: {Pad.ref(:output, id), caps}}, state} + end + + @impl true + def handle_demand(Pad.ref(:output, id), size, :buffers, _ctx, state) do + {{:ok, demand: {Pad.ref(:input, id), size}}, state} + end + + @impl true + def handle_event(Pad.ref(direction, id), event, _ctx, state) do + opposite_direction = if direction == :input, do: :output, else: :input + {{:ok, event: {Pad.ref(opposite_direction, id), event}}, state} + end + + @impl true + def handle_process(Pad.ref(:input, id), buffer, _ctx, state) do + {seq_num, state} = Map.get_and_update!(state, :seq_num, &{&1, rem(&1 + 1, @seq_number_limit)}) + + buffer = + Header.Extension.put(buffer, %Header.Extension{identifier: :twcc, data: <>}) + + {{:ok, buffer: {Pad.ref(:output, id), buffer}}, state} + end + + @impl true + def handle_end_of_stream(Pad.ref(:input, id), _ctx, state) do + {{:ok, end_of_stream: Pad.ref(:output, id)}, state} + end +end diff --git a/test/membrane/rtp/twcc_receiver_test.exs b/test/membrane/rtp/twcc_receiver_test.exs index 75bddaab..8fdda3d6 100644 --- a/test/membrane/rtp/twcc_receiver_test.exs +++ b/test/membrane/rtp/twcc_receiver_test.exs @@ -12,7 +12,7 @@ defmodule Membrane.RTP.TWCCReceiverTest do @feedback_packet_count 100 @max_feedback_packet_count Bitwise.bsl(1, 8) - 1 - @sender_ssrc 1_234_567_890 + @feedback_sender_ssrc 1_234_567_890 @media_ssrc 9_876_543_210 @other_media_ssrc 1_111_111_111 @@ -31,7 +31,7 @@ defmodule Membrane.RTP.TWCCReceiverTest do state = %TWCCReceiver.State{ twcc_id: @default_twcc_id, - sender_ssrc: @sender_ssrc, + feedback_sender_ssrc: @feedback_sender_ssrc, report_interval: nil, feedback_packet_count: @feedback_packet_count } @@ -110,7 +110,7 @@ defmodule Membrane.RTP.TWCCReceiverTest do assert {{:ok, event: {_event_pad, event}}, _state} = TWCCReceiver.handle_tick(:report_timer, nil, state) - assert event.rtcp.sender_ssrc == @sender_ssrc + assert event.rtcp.sender_ssrc == @feedback_sender_ssrc end test "increments its feedback packet count", %{state: state, buffer: buffer} do From 0e7804c5d8e061df02f95136ce85b920ab368d54 Mon Sep 17 00:00:00 2001 From: Jakub Balinski Date: Fri, 14 Jan 2022 16:22:31 +0100 Subject: [PATCH 04/10] handle malformed TWCC feedback packets --- .../rtcp/transport_feedback_packet/twcc.ex | 68 +++++++++++++------ 1 file changed, 48 insertions(+), 20 deletions(-) diff --git a/lib/membrane/rtcp/transport_feedback_packet/twcc.ex b/lib/membrane/rtcp/transport_feedback_packet/twcc.ex index dde99cec..999207ae 100644 --- a/lib/membrane/rtcp/transport_feedback_packet/twcc.ex +++ b/lib/membrane/rtcp/transport_feedback_packet/twcc.ex @@ -60,18 +60,28 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do @impl true def decode( <> + payload::binary>> = packet ) do - receive_deltas = parse_feedback(payload, packet_status_count) - - {:ok, - %__MODULE__{ - base_seq_num: base_seq_num, - reference_time: Time.milliseconds(reference_time) * 64, - packet_status_count: packet_status_count, - receive_deltas: receive_deltas, - feedback_packet_count: feedback_packet_count - }} + case parse_feedback(payload, packet_status_count) do + {:ok, receive_deltas} -> + {:ok, + %__MODULE__{ + base_seq_num: base_seq_num, + reference_time: Time.milliseconds(reference_time) * 64, + packet_status_count: packet_status_count, + receive_deltas: receive_deltas, + feedback_packet_count: feedback_packet_count + }} + + {:error, reason} -> + Membrane.Logger.warn(""" + An error occured while parsing TWCC feedback packet. + Reason: #{reason} + Packet: #{inspect(packet, limit: :infinity)} + """) + + {:error, :malformed_packet} + end end @impl true @@ -105,9 +115,14 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do end defp parse_feedback(payload, packet_status_count) do - {receive_deltas, parsed_packet_status} = parse_packet_status(payload, packet_status_count, []) - - parse_receive_deltas(receive_deltas, parsed_packet_status, []) + with {:ok, {encoded_receive_deltas, parsed_packet_status}} <- + parse_packet_status(payload, packet_status_count, []), + {:ok, receive_deltas} <- + parse_receive_deltas(encoded_receive_deltas, parsed_packet_status, []) do + {:ok, receive_deltas} + else + {:error, reason} -> {:error, reason} + end end defp parse_packet_status(binary, packets_left, parsed_status) when packets_left <= 0 do @@ -115,7 +130,7 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do # status vector incomplete, filling the untaken slots with 0s - we may need to drop them parsed_status = Enum.drop(parsed_status, packets_left) - {binary, parsed_status} + {:ok, {binary, parsed_status}} end defp parse_packet_status( @@ -123,7 +138,8 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do rest::binary>>, packets_left, parsed_status - ) do + ) + when run_length <= packets_left do new_status = @packet_status_flags |> BiMap.fetch_key!(packet_status) @@ -150,7 +166,20 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do parse_packet_status(rest, packets_left - div(14, symbol_size), parsed_status ++ new_status) end - defp parse_receive_deltas(_padding, [], parsed_deltas), do: Enum.reverse(parsed_deltas) + defp parse_packet_status(_binary, _packets_left, _parsed_status), + do: {:error, :incorrect_run_length} + + defp parse_receive_deltas(padding, [], parsed_deltas) do + if :binary.decode_unsigned(padding) != 0 do + {:error, :invalid_padding} + else + {:ok, Enum.reverse(parsed_deltas)} + end + end + + defp parse_receive_deltas(binary, [:not_received | rest_status], parsed_deltas) do + parse_receive_deltas(binary, rest_status, [:not_received | parsed_deltas]) + end defp parse_receive_deltas( <>, @@ -168,9 +197,8 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do parse_receive_deltas(rest, rest_status, [Time.microseconds(delta) * 250 | parsed_deltas]) end - defp parse_receive_deltas(binary, [_not_received_or_reserved | rest_status], parsed_deltas) do - parse_receive_deltas(binary, rest_status, [:not_received | parsed_deltas]) - end + defp parse_receive_deltas(_binary, [:reserved | _rest_status], _parsed_deltas), + do: {:error, :symbol_reserved} defp make_packet_status_chunks(scaled_receive_deltas) do scaled_receive_deltas From d3ad3a61b71fce202a7fd5af6b89b1a95b16781e Mon Sep 17 00:00:00 2001 From: Jakub Balinski Date: Tue, 18 Jan 2022 16:07:42 +0100 Subject: [PATCH 05/10] add tests for encoding and decoding twcc packets --- .../rtcp/transport_feedback_packet/twcc.ex | 2 + test/fixtures/rtcp/twcc_feedbacks.hex | 4 + .../rtcp/twcc_malformed_feedbacks.hex | 4 + test/membrane/rtcp/twcc_test.exs | 28 ++++++ test/support/rtcp_fixtures.ex | 99 +++++++++++++++++++ 5 files changed, 137 insertions(+) create mode 100644 test/fixtures/rtcp/twcc_feedbacks.hex create mode 100644 test/fixtures/rtcp/twcc_malformed_feedbacks.hex create mode 100644 test/membrane/rtcp/twcc_test.exs diff --git a/lib/membrane/rtcp/transport_feedback_packet/twcc.ex b/lib/membrane/rtcp/transport_feedback_packet/twcc.ex index 999207ae..bea178a1 100644 --- a/lib/membrane/rtcp/transport_feedback_packet/twcc.ex +++ b/lib/membrane/rtcp/transport_feedback_packet/twcc.ex @@ -218,6 +218,8 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do %StatusVector{vector: vector} = run_length_to_status_vector(run_length) [%StatusVector{vector: [status | vector], packet_count: count + 1} | rest] + # TODO: if no large or negative deltas has been encountered, consider using 1-bit status vector + [%StatusVector{vector: vector, packet_count: count} | rest] when count < @status_vector_capacity -> [%StatusVector{vector: [status | vector], packet_count: count + 1} | rest] diff --git a/test/fixtures/rtcp/twcc_feedbacks.hex b/test/fixtures/rtcp/twcc_feedbacks.hex new file mode 100644 index 00000000..7b9ce96e --- /dev/null +++ b/test/fixtures/rtcp/twcc_feedbacks.hex @@ -0,0 +1,4 @@ +772f00051ed96e0bd6504058feb80000 +849300061ed9c273c0148408 +0bb4000f1ed8fd0c2008e5554400000c08180004fffc341c041c0010 +518c00151ed89d0ed655d556d551a80cfff0041000100c0000000008fff81c0028300c04 \ No newline at end of file diff --git a/test/fixtures/rtcp/twcc_malformed_feedbacks.hex b/test/fixtures/rtcp/twcc_malformed_feedbacks.hex new file mode 100644 index 00000000..30103695 --- /dev/null +++ b/test/fixtures/rtcp/twcc_malformed_feedbacks.hex @@ -0,0 +1,4 @@ +772f00051ed96e0bd750405800000000 +772f000a1ed96e0bd6504058feb80000 +772f00051ed96e0bd6504058feb8000000000080 +0bb4000f1ed8fd0c6008e5554400000c08180004fffc341c041c0010 \ No newline at end of file diff --git a/test/membrane/rtcp/twcc_test.exs b/test/membrane/rtcp/twcc_test.exs new file mode 100644 index 00000000..64424574 --- /dev/null +++ b/test/membrane/rtcp/twcc_test.exs @@ -0,0 +1,28 @@ +defmodule Membrane.RTCP.TWCCTest do + use ExUnit.Case, async: true + + alias Membrane.RTCP.Fixtures + alias Membrane.RTCP.TransportFeedbackPacket.TWCC + + describe "TWCC module" do + test "encodes and decodes valid feedbacks" do + encoded_feedbacks = Fixtures.twcc_feedbacks() + expected_feedbacks = Fixtures.twcc_feedbacks_contents() + + encoded_feedbacks + |> Enum.zip(expected_feedbacks) + |> Enum.each(fn {encoded, expected} -> + assert TWCC.decode(encoded) == {:ok, expected} + assert TWCC.encode(expected) == encoded + end) + end + + test "does not decode malformed feedbacks" do + encoded_feedbacks = Fixtures.twcc_malformed_feedbacks() + + Enum.each(encoded_feedbacks, fn encoded -> + assert TWCC.decode(encoded) == {:error, :malformed_packet} + end) + end + end +end diff --git a/test/support/rtcp_fixtures.ex b/test/support/rtcp_fixtures.ex index a92773d3..fc3e5e06 100644 --- a/test/support/rtcp_fixtures.ex +++ b/test/support/rtcp_fixtures.ex @@ -13,6 +13,12 @@ defmodule Membrane.RTCP.Fixtures do @external_resource "test/fixtures/rtcp/malformed.hex" @malformed_packet File.read!("test/fixtures/rtcp/malformed.hex") + @external_resource "test/fixtures/rtcp/twcc_feedbacks.hex" + @twcc_feedbacks File.read!("test/fixtures/rtcp/twcc_feedbacks.hex") + + @external_resource "test/fixtures/rtcp/twcc_malformed_feedbacks.hex" + @twcc_malformed_feedbacks File.read!("test/fixtures/rtcp/twcc_malformed_feedbacks.hex") + @spec sample_packet_binary() :: binary() def sample_packet_binary, do: hex_to_bin(@sample_rtcp_packet) @@ -29,6 +35,20 @@ defmodule Membrane.RTCP.Fixtures do |> Enum.map(&hex_to_bin/1) end + @spec twcc_feedbacks() :: [binary()] + def twcc_feedbacks() do + @twcc_feedbacks + |> String.split() + |> Enum.map(&hex_to_bin/1) + end + + @spec twcc_malformed_feedbacks() :: [binary()] + def twcc_malformed_feedbacks() do + @twcc_malformed_feedbacks + |> String.split() + |> Enum.map(&hex_to_bin/1) + end + @spec packet_list_contents() :: [map()] def packet_list_contents() do ssrc = 0x62DBEFD0 @@ -94,6 +114,85 @@ defmodule Membrane.RTCP.Fixtures do ] end + @spec twcc_feedbacks_contents() :: [struct()] + def twcc_feedbacks_contents() do + [ + %Membrane.RTCP.TransportFeedbackPacket.TWCC{ + base_seq_num: 30_511, + feedback_packet_count: 11, + packet_status_count: 5, + receive_deltas: [16_000_000, 22_000_000, -82_000_000, 0, 0], + reference_time: 129_391_488_000_000 + }, + %Membrane.RTCP.TransportFeedbackPacket.TWCC{ + base_seq_num: 33_939, + feedback_packet_count: 115, + packet_status_count: 6, + receive_deltas: [ + :not_received, + :not_received, + :not_received, + :not_received, + 33_000_000, + 2_000_000 + ], + reference_time: 129_396_864_000_000 + }, + %Membrane.RTCP.TransportFeedbackPacket.TWCC{ + base_seq_num: 2996, + feedback_packet_count: 12, + packet_status_count: 15, + receive_deltas: [ + 17_000_000, + 0, + 0, + 3_000_000, + 2_000_000, + 6_000_000, + 0, + 1_000_000, + -1_000_000, + 13_000_000, + 7_000_000, + 1_000_000, + 7_000_000, + 0, + 4_000_000 + ], + reference_time: 129_384_256_000_000 + }, + %Membrane.RTCP.TransportFeedbackPacket.TWCC{ + base_seq_num: 20_876, + feedback_packet_count: 14, + packet_status_count: 21, + receive_deltas: [ + 42_000_000, + 3_000_000, + -4_000_000, + 1_000_000, + 4_000_000, + 0, + 4_000_000, + 3_000_000, + 0, + 0, + 0, + 0, + 2_000_000, + -2_000_000, + 7_000_000, + 0, + 10_000_000, + 12_000_000, + 3_000_000, + :not_received, + 1_000_000 + ], + reference_time: 129_378_112_000_000 + } + ] + end + # iso8601 does not allow nanoseconds @spec to_membrane_time(String.t(), non_neg_integer) :: Membrane.Time.t() def to_membrane_time(iso8601, nanoseconds) do From 78bf0013db0d11b9a0874084755661529129e5e0 Mon Sep 17 00:00:00 2001 From: Jakub Balinski Date: Thu, 20 Jan 2022 10:09:20 +0100 Subject: [PATCH 06/10] minor improvements before PR --- .../rtcp/transport_feedback_packet/twcc.ex | 22 +++++++++++-------- lib/membrane/rtp/twcc_receiver.ex | 3 ++- mix.exs | 2 +- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/lib/membrane/rtcp/transport_feedback_packet/twcc.ex b/lib/membrane/rtcp/transport_feedback_packet/twcc.ex index bea178a1..156f7915 100644 --- a/lib/membrane/rtcp/transport_feedback_packet/twcc.ex +++ b/lib/membrane/rtcp/transport_feedback_packet/twcc.ex @@ -47,6 +47,7 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do @run_length_capacity @max_u13_val @status_vector_id 1 + @status_vector_symbol_1_bit_id 0 @status_vector_symbol_2_bit_id 1 @status_vector_capacity 7 @@ -153,9 +154,11 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do packets_left, parsed_status ) do - # vector_type = 0 -> 14 symbols, 1 bit each - # vector_type = 1 -> 7 symbols, 2 bits each - symbol_size = vector_type + 1 + {symbol_size, packets_parsed} = + case vector_type do + @status_vector_symbol_1_bit_id -> {1, 14} + @status_vector_symbol_2_bit_id -> {2, 7} + end # note about 1-bit symbols: the draft does not specify this, # but libwebrtc treats <<1::1>> as a "packet received, small delta" status @@ -163,7 +166,7 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do for <<(<> <- symbol_list)>>, do: BiMap.fetch_key!(@packet_status_flags, symbol) - parse_packet_status(rest, packets_left - div(14, symbol_size), parsed_status ++ new_status) + parse_packet_status(rest, packets_left - packets_parsed, parsed_status ++ new_status) end defp parse_packet_status(_binary, _packets_left, _parsed_status), @@ -232,10 +235,6 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do defp encode_receive_delta(scaled_delta) do case delta_to_packet_status(scaled_delta) do - :not_received -> - Membrane.Logger.warn("Reporting a non-received packet") - <<>> - :small_delta -> <> @@ -245,6 +244,10 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do ) <> + + :not_received -> + Membrane.Logger.warn("Reporting a non-received packet") + <<>> end end @@ -257,7 +260,8 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do <> end) - # we use 2-bit symbols, so padding size for an incomplete vector is 2*(number of unfilled slots) bits + # in current implementation we use only 2-bit symbols for encoding, so padding + # size for an incomplete vector is always 2*(number of unfilled slots) bits padding_size = 2 * (@status_vector_capacity - packet_count) <<@status_vector_id::1, @status_vector_symbol_2_bit_id::1, symbol_list::bitstring, diff --git a/lib/membrane/rtp/twcc_receiver.ex b/lib/membrane/rtp/twcc_receiver.ex index 341fa063..d52f52b2 100644 --- a/lib/membrane/rtp/twcc_receiver.ex +++ b/lib/membrane/rtp/twcc_receiver.ex @@ -1,6 +1,7 @@ defmodule Membrane.RTP.TWCCReceiver do @moduledoc """ - The module defines an element responsible for recording transport-wide statistics of incoming packets. + The module defines an element responsible for recording transport-wide statistics of incoming packets + and generating TWCC feedbacks. """ use Membrane.Filter diff --git a/mix.exs b/mix.exs index 955997ee..2dcdd3fb 100644 --- a/mix.exs +++ b/mix.exs @@ -63,7 +63,7 @@ defmodule Membrane.RTP.Plugin.MixProject do defp package do [ maintainers: ["Membrane Team"], - licenses: ["Apache 2.0"], + licenses: ["Apache-2.0"], links: %{ "GitHub" => @github_url, "Membrane Framework Homepage" => "https://membraneframework.org" From 64aff7d0b5b8f5d56c81f335eef4fd8e03525db9 Mon Sep 17 00:00:00 2001 From: Jakub Balinski Date: Thu, 20 Jan 2022 10:11:21 +0100 Subject: [PATCH 07/10] update deps --- mix.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mix.lock b/mix.lock index 82c491b9..7eeac636 100644 --- a/mix.lock +++ b/mix.lock @@ -6,11 +6,11 @@ "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, "certifi": {:hex, :certifi, "2.8.0", "d4fb0a6bb20b7c9c3643e22507e42f356ac090a1dcea9ab99e27e0376d695eba", [:rebar3], [], "hexpm", "6ac7efc1c6f8600b08d625292d4bbf584e14847ce1b6b5c44d983d273e1097ea"}, "coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"}, - "credo": {:hex, :credo, "1.6.1", "7dc76dcdb764a4316c1596804c48eada9fff44bd4b733a91ccbf0c0f368be61e", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "698607fb5993720c7e93d2d8e76f2175bba024de964e160e2f7151ef3ab82ac5"}, + "credo": {:hex, :credo, "1.6.2", "2f82b29a47c0bb7b72f023bf3a34d151624f1cbe1e6c4e52303b05a11166a701", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "ae9dc112bc368e7b145c547bec2ed257ef88955851c15057c7835251a17211c6"}, "dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.18", "e1b2be73eb08a49fb032a0208bf647380682374a725dfb5b9e510def8397f6f2", [:mix], [], "hexpm", "114a0e85ec3cf9e04b811009e73c206394ffecfcc313e0b346de0d557774ee97"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.19", "de0d033d5ff9fc396a24eadc2fcf2afa3d120841eb3f1004d138cbf9273210e8", [:mix], [], "hexpm", "527ab6630b5c75c3a3960b75844c314ec305c76d9899bb30f71cb85952a9dc45"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, - "ex_doc": {:hex, :ex_doc, "0.26.0", "1922164bac0b18b02f84d6f69cab1b93bc3e870e2ad18d5dacb50a9e06b542a3", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2775d66e494a9a48355db7867478ffd997864c61c65a47d31c4949459281c78d"}, + "ex_doc": {:hex, :ex_doc, "0.27.3", "d09ed7ab590b71123959d9017f6715b54a448d76b43cf909eb0b2e5a78a977b2", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "ee60b329d08195039bfeb25231a208749be4f2274eae42ce38f9be0538a2f2e6"}, "ex_libsrtp": {:hex, :ex_libsrtp, "0.3.0", "964cf207fc44357b9ef103ade78e99004de70df8ee44bf1c8f6a18051d0d3667", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:unifex, "~> 0.7.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "62c2c40f6481ee670c864d26aa6496ad851c9b694bbd5367111bc93f3be2e44a"}, "ex_pcap": {:git, "https://github.com/membraneframework/expcap.git", "07c5bfa25280ea6a28d022d3a206ececf9b9913a", []}, "excoveralls": {:hex, :excoveralls, "0.14.4", "295498f1ae47bdc6dce59af9a585c381e1aefc63298d48172efaaa90c3d251db", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "e3ab02f2df4c1c7a519728a6f0a747e71d7d6e846020aae338173619217931c1"}, @@ -19,7 +19,7 @@ "hackney": {:hex, :hackney, "1.18.0", "c4443d960bb9fba6d01161d01cd81173089686717d9490e5d3606644c48d121f", [:rebar3], [{:certifi, "~>2.8.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "9afcda620704d720db8c6a3123e9848d09c87586dc1c10479c42627b905b5c5e"}, "heap": {:hex, :heap, "2.0.2", "d98cb178286cfeb5edbcf17785e2d20af73ca57b5a2cf4af584118afbcf917eb", [:mix], [], "hexpm", "ba9ea2fe99eb4bcbd9a8a28eaf71cbcac449ca1d8e71731596aace9028c9d429"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, - "jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"}, + "jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"}, "makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"}, "makeup_elixir": {:hex, :makeup_elixir, "0.15.2", "dc72dfe17eb240552857465cc00cce390960d9a0c055c4ccd38b70629227e97c", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "fd23ae48d09b32eff49d4ced2b43c9f086d402ee4fd4fcb2d7fad97fa8823e75"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, From 3e4a123f73701e74641e8d132322cc41405e7558 Mon Sep 17 00:00:00 2001 From: Jakub Balinski Date: Thu, 20 Jan 2022 10:19:56 +0100 Subject: [PATCH 08/10] fix credo --- lib/membrane/rtcp/sdes_packet.ex | 2 +- lib/membrane/rtp/packet.ex | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/membrane/rtcp/sdes_packet.ex b/lib/membrane/rtcp/sdes_packet.ex index e8b76b42..df30a727 100644 --- a/lib/membrane/rtcp/sdes_packet.ex +++ b/lib/membrane/rtcp/sdes_packet.ex @@ -106,7 +106,7 @@ defmodule Membrane.RTCP.SdesPacket do parse_items(rest, [item | acc]) end - defp parse_items(<<_si_type::8, _::binary>>, _acc) do + defp parse_items(<<_si_type::8, _payload::binary>>, _acc) do {:error, :unknown_si_type} end diff --git a/lib/membrane/rtp/packet.ex b/lib/membrane/rtp/packet.ex index 4449517f..d4f0f4f4 100644 --- a/lib/membrane/rtp/packet.ex +++ b/lib/membrane/rtp/packet.ex @@ -88,7 +88,7 @@ defmodule Membrane.RTP.Packet do | {:error, :wrong_version | :malformed_packet} def parse(packet, encrypted?) - def parse(<>, _encrypted?) when version != 2, + def parse(<>, _encrypted?) when version != 2, do: {:error, :wrong_version} def parse( From 76eec709f5d5c92ef71891b5e9158103dbf6a41d Mon Sep 17 00:00:00 2001 From: Jakub Balinski Date: Thu, 20 Jan 2022 11:25:06 +0100 Subject: [PATCH 09/10] adjust to CR suggestions --- .../rtcp/transport_feedback_packet/twcc.ex | 24 ++++++++----------- lib/membrane/rtp/session_bin.ex | 15 ++++++++---- test/membrane/rtcp/twcc_test.exs | 6 ++--- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/lib/membrane/rtcp/transport_feedback_packet/twcc.ex b/lib/membrane/rtcp/transport_feedback_packet/twcc.ex index 156f7915..1cb40258 100644 --- a/lib/membrane/rtcp/transport_feedback_packet/twcc.ex +++ b/lib/membrane/rtcp/transport_feedback_packet/twcc.ex @@ -117,21 +117,17 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do defp parse_feedback(payload, packet_status_count) do with {:ok, {encoded_receive_deltas, parsed_packet_status}} <- - parse_packet_status(payload, packet_status_count, []), - {:ok, receive_deltas} <- - parse_receive_deltas(encoded_receive_deltas, parsed_packet_status, []) do - {:ok, receive_deltas} - else - {:error, reason} -> {:error, reason} + parse_packet_status(payload, packet_status_count, []) do + parse_receive_deltas(encoded_receive_deltas, parsed_packet_status, []) end end - defp parse_packet_status(binary, packets_left, parsed_status) when packets_left <= 0 do + defp parse_packet_status(rest, packets_left, parsed_status) when packets_left <= 0 do # note about incomplete vectors: the draft does not specify this, but libwebrtc can make the last # status vector incomplete, filling the untaken slots with 0s - we may need to drop them parsed_status = Enum.drop(parsed_status, packets_left) - {:ok, {binary, parsed_status}} + {:ok, {rest, parsed_status}} end defp parse_packet_status( @@ -169,8 +165,8 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do parse_packet_status(rest, packets_left - packets_parsed, parsed_status ++ new_status) end - defp parse_packet_status(_binary, _packets_left, _parsed_status), - do: {:error, :incorrect_run_length} + defp parse_packet_status(_payload, _packets_left, _parsed_status), + do: {:error, :invalid_run_length} defp parse_receive_deltas(padding, [], parsed_deltas) do if :binary.decode_unsigned(padding) != 0 do @@ -180,8 +176,8 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do end end - defp parse_receive_deltas(binary, [:not_received | rest_status], parsed_deltas) do - parse_receive_deltas(binary, rest_status, [:not_received | parsed_deltas]) + defp parse_receive_deltas(rest, [:not_received | rest_status], parsed_deltas) do + parse_receive_deltas(rest, rest_status, [:not_received | parsed_deltas]) end defp parse_receive_deltas( @@ -200,7 +196,7 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do parse_receive_deltas(rest, rest_status, [Time.microseconds(delta) * 250 | parsed_deltas]) end - defp parse_receive_deltas(_binary, [:reserved | _rest_status], _parsed_deltas), + defp parse_receive_deltas(_payload, [:reserved | _rest_status], _parsed_deltas), do: {:error, :symbol_reserved} defp make_packet_status_chunks(scaled_receive_deltas) do @@ -221,7 +217,7 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do %StatusVector{vector: vector} = run_length_to_status_vector(run_length) [%StatusVector{vector: [status | vector], packet_count: count + 1} | rest] - # TODO: if no large or negative deltas has been encountered, consider using 1-bit status vector + # TODO: if no large or negative delta has been encountered, consider using 1-bit status vector [%StatusVector{vector: vector, packet_count: count} | rest] when count < @status_vector_capacity -> diff --git a/lib/membrane/rtp/session_bin.ex b/lib/membrane/rtp/session_bin.ex index f2e4733d..ddbe52f6 100644 --- a/lib/membrane/rtp/session_bin.ex +++ b/lib/membrane/rtp/session_bin.ex @@ -77,7 +77,8 @@ defmodule Membrane.RTP.SessionBin do ### TWCC TWCC as a transport-wide extension is handled differently, and is linked from `RTP.SSRCRouter` to possibly many `RTP.StreamReceiveBin`s. Only the first TWCC extension is initialized, and it - will handle all RTP streams that have declared support for it. + will handle all RTP streams that have declared support for it. For outgoing streams, an `RTP.TWCCSender` + element will be spawned and linked to all `RTP.StreamSendBin`s. """ @type rtp_extension_options_t :: {extension_name :: rtp_extension_name_t(), @@ -622,7 +623,8 @@ defmodule Membrane.RTP.SessionBin do maybe_link_twcc = if should_link? do - &(&1 + fn link_builder -> + link_builder |> via_in(Pad.ref(:input, pad_ssrc)) |> then(fn link -> if should_create_child? do @@ -631,7 +633,8 @@ defmodule Membrane.RTP.SessionBin do to(link, :twcc_receiver) end end) - |> via_out(Pad.ref(:output, pad_ssrc))) + |> via_out(Pad.ref(:output, pad_ssrc)) + end else & &1 end @@ -647,7 +650,8 @@ defmodule Membrane.RTP.SessionBin do should_create_child? = not Map.has_key?(ctx.children, :twcc_sender) if should_link? do - &(&1 + fn link_builder -> + link_builder |> via_in(Pad.ref(:input, pad_ssrc)) |> then(fn link -> if should_create_child? do @@ -656,7 +660,8 @@ defmodule Membrane.RTP.SessionBin do to(link, :twcc_sender) end end) - |> via_out(Pad.ref(:output, pad_ssrc))) + |> via_out(Pad.ref(:output, pad_ssrc)) + end else & &1 end diff --git a/test/membrane/rtcp/twcc_test.exs b/test/membrane/rtcp/twcc_test.exs index 64424574..197a5ed6 100644 --- a/test/membrane/rtcp/twcc_test.exs +++ b/test/membrane/rtcp/twcc_test.exs @@ -12,8 +12,8 @@ defmodule Membrane.RTCP.TWCCTest do encoded_feedbacks |> Enum.zip(expected_feedbacks) |> Enum.each(fn {encoded, expected} -> - assert TWCC.decode(encoded) == {:ok, expected} - assert TWCC.encode(expected) == encoded + assert {:ok, expected} == TWCC.decode(encoded) + assert encoded == TWCC.encode(expected) end) end @@ -21,7 +21,7 @@ defmodule Membrane.RTCP.TWCCTest do encoded_feedbacks = Fixtures.twcc_malformed_feedbacks() Enum.each(encoded_feedbacks, fn encoded -> - assert TWCC.decode(encoded) == {:error, :malformed_packet} + assert {:error, :malformed_packet} == TWCC.decode(encoded) end) end end From 3c7c63db7d665e6e665cd51b2bf0277302a57b2c Mon Sep 17 00:00:00 2001 From: Jakub Balinski Date: Thu, 20 Jan 2022 15:05:48 +0100 Subject: [PATCH 10/10] make `reference_time` a signed integer to comply with specification --- lib/membrane/rtcp/transport_feedback_packet/twcc.ex | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/membrane/rtcp/transport_feedback_packet/twcc.ex b/lib/membrane/rtcp/transport_feedback_packet/twcc.ex index 1cb40258..773ea959 100644 --- a/lib/membrane/rtcp/transport_feedback_packet/twcc.ex +++ b/lib/membrane/rtcp/transport_feedback_packet/twcc.ex @@ -60,8 +60,8 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do @impl true def decode( - <> = packet + <> = packet ) do case parse_feedback(payload, packet_status_count) do {:ok, receive_deltas} -> @@ -103,7 +103,8 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do reference_time = div(reference_time, Time.milliseconds(64)) encoded_header = - <> + <> encoded_packet_status_chunks = Enum.map_join(packet_status_chunks, &encode_packet_status_chunk/1)