Skip to content

Commit

Permalink
Merge pull request #88 from membraneframework/outbound-twcc
Browse files Browse the repository at this point in the history
Outbound TWCC
  • Loading branch information
balins authored Jan 24, 2022
2 parents 95f3667 + 3c7c63d commit 5c1d7af
Show file tree
Hide file tree
Showing 13 changed files with 404 additions and 106 deletions.
2 changes: 1 addition & 1 deletion lib/membrane/rtcp/sdes_packet.ex
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ defmodule Membrane.RTCP.SdesPacket do
parse_items(rest, [item | acc])
end

defp parse_items(<<_si_type::8, _::binary>>, _acc) do
defp parse_items(<<_si_type::8, _payload::binary>>, _acc) do
{:error, :unknown_si_type}
end

Expand Down
236 changes: 157 additions & 79 deletions lib/membrane/rtcp/transport_feedback_packet/twcc.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do
@moduledoc """
Serializes [Transport-wide congestion control](https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01)
Encodes and decodes [Transport-wide congestion control](https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01)
feedback packets.
"""
@behaviour Membrane.RTCP.TransportFeedbackPacket
Expand Down Expand Up @@ -47,29 +47,42 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do
@run_length_capacity @max_u13_val

@status_vector_id 1
@status_vector_symbol_1_bit_id 0
@status_vector_symbol_2_bit_id 1
@status_vector_capacity 7

@packet_status_code %{
not_received: 0,
small_delta: 1,
large_or_negative_delta: 2,
reserved: 3
}
@packet_status_flags BiMap.new(%{
not_received: 0,
small_delta: 1,
large_or_negative_delta: 2,
reserved: 3
})

@impl true
def decode(binary) do
<<base_seq_num::16, packet_status_count::16, reference_time::24, feedback_packet_count::8,
_rest::binary>> = binary

{:ok,
%__MODULE__{
base_seq_num: base_seq_num,
reference_time: reference_time * Time.milliseconds(64),
packet_status_count: packet_status_count,
receive_deltas: [],
feedback_packet_count: feedback_packet_count
}}
def decode(
<<base_seq_num::16, packet_status_count::16, reference_time::signed-integer-size(24),
feedback_packet_count::8, payload::binary>> = packet
) do
case parse_feedback(payload, packet_status_count) do
{:ok, receive_deltas} ->
{:ok,
%__MODULE__{
base_seq_num: base_seq_num,
reference_time: Time.milliseconds(reference_time) * 64,
packet_status_count: packet_status_count,
receive_deltas: receive_deltas,
feedback_packet_count: feedback_packet_count
}}

{:error, reason} ->
Membrane.Logger.warn("""
An error occured while parsing TWCC feedback packet.
Reason: #{reason}
Packet: #{inspect(packet, limit: :infinity)}
""")

{:error, :malformed_packet}
end
end

@impl true
Expand All @@ -89,13 +102,104 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do
# https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01#section-3.1
reference_time = div(reference_time, Time.milliseconds(64))

payload =
<<base_seq_num::16, packet_status_count::16, reference_time::24, feedback_packet_count::8>> <>
encode_packet_status(packet_status_chunks) <> encode_receive_deltas(scaled_receive_deltas)
encoded_header =
<<base_seq_num::16, packet_status_count::16, reference_time::signed-integer-size(24),
feedback_packet_count::8>>

encoded_packet_status_chunks =
Enum.map_join(packet_status_chunks, &encode_packet_status_chunk/1)

encoded_receive_deltas = Enum.map_join(scaled_receive_deltas, &encode_receive_delta/1)

maybe_add_padding(payload)
[encoded_header, encoded_packet_status_chunks, encoded_receive_deltas]
|> Enum.join()
|> maybe_add_padding()
end

defp parse_feedback(payload, packet_status_count) do
with {:ok, {encoded_receive_deltas, parsed_packet_status}} <-
parse_packet_status(payload, packet_status_count, []) do
parse_receive_deltas(encoded_receive_deltas, parsed_packet_status, [])
end
end

defp parse_packet_status(rest, packets_left, parsed_status) when packets_left <= 0 do
# note about incomplete vectors: the draft does not specify this, but libwebrtc can make the last
# status vector incomplete, filling the untaken slots with 0s - we may need to drop them
parsed_status = Enum.drop(parsed_status, packets_left)

{:ok, {rest, parsed_status}}
end

defp parse_packet_status(
<<@run_length_id::1, packet_status::2, run_length::unsigned-integer-size(13),
rest::binary>>,
packets_left,
parsed_status
)
when run_length <= packets_left do
new_status =
@packet_status_flags
|> BiMap.fetch_key!(packet_status)
|> List.duplicate(run_length)

parse_packet_status(rest, packets_left - run_length, parsed_status ++ new_status)
end

defp parse_packet_status(
<<@status_vector_id::1, vector_type::1, symbol_list::bits-size(14), rest::binary>>,
packets_left,
parsed_status
) do
{symbol_size, packets_parsed} =
case vector_type do
@status_vector_symbol_1_bit_id -> {1, 14}
@status_vector_symbol_2_bit_id -> {2, 7}
end

# note about 1-bit symbols: the draft does not specify this,
# but libwebrtc treats <<1::1>> as a "packet received, small delta" status
new_status =
for <<(<<symbol::size(symbol_size)>> <- symbol_list)>>,
do: BiMap.fetch_key!(@packet_status_flags, symbol)

parse_packet_status(rest, packets_left - packets_parsed, parsed_status ++ new_status)
end

defp parse_packet_status(_payload, _packets_left, _parsed_status),
do: {:error, :invalid_run_length}

defp parse_receive_deltas(padding, [], parsed_deltas) do
if :binary.decode_unsigned(padding) != 0 do
{:error, :invalid_padding}
else
{:ok, Enum.reverse(parsed_deltas)}
end
end

defp parse_receive_deltas(rest, [:not_received | rest_status], parsed_deltas) do
parse_receive_deltas(rest, rest_status, [:not_received | parsed_deltas])
end

defp parse_receive_deltas(
<<delta::unsigned-integer-size(8), rest::binary>>,
[:small_delta | rest_status],
parsed_deltas
) do
parse_receive_deltas(rest, rest_status, [Time.microseconds(delta) * 250 | parsed_deltas])
end

defp parse_receive_deltas(
<<delta::signed-integer-size(16), rest::binary>>,
[:large_or_negative_delta | rest_status],
parsed_deltas
) do
parse_receive_deltas(rest, rest_status, [Time.microseconds(delta) * 250 | parsed_deltas])
end

defp parse_receive_deltas(_payload, [:reserved | _rest_status], _parsed_deltas),
do: {:error, :symbol_reserved}

defp make_packet_status_chunks(scaled_receive_deltas) do
scaled_receive_deltas
|> Enum.map(&delta_to_packet_status/1)
Expand All @@ -114,6 +218,8 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do
%StatusVector{vector: vector} = run_length_to_status_vector(run_length)
[%StatusVector{vector: [status | vector], packet_count: count + 1} | rest]

# TODO: if no large or negative delta has been encountered, consider using 1-bit status vector

[%StatusVector{vector: vector, packet_count: count} | rest]
when count < @status_vector_capacity ->
[%StatusVector{vector: [status | vector], packet_count: count + 1} | rest]
Expand All @@ -124,71 +230,43 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do
end)
end

defp encode_receive_deltas(scaled_receive_deltas) do
Enum.map_join(scaled_receive_deltas, fn delta ->
case delta_to_packet_status(delta) do
:not_received ->
<<>>

:small_delta ->
<<delta::8>>
defp encode_receive_delta(scaled_delta) do
case delta_to_packet_status(scaled_delta) do
:small_delta ->
<<scaled_delta::8>>

:large_or_negative_delta ->
Membrane.Logger.warn(
"Reporting a packet with large or negative delta: (#{inspect(delta / 4)}ms)"
)
:large_or_negative_delta ->
Membrane.Logger.warn(
"Reporting a packet with large or negative delta: (#{inspect(scaled_delta / 4)}ms)"
)

<<cap_delta(delta)::16>>
end
end)
end
<<cap_delta(scaled_delta)::16>>

defp encode_packet_status(packet_status_chunks) do
Enum.map_join(packet_status_chunks, fn chunk ->
case chunk do
%RunLength{} ->
encode_run_length(chunk)

%StatusVector{packet_count: @status_vector_capacity} ->
encode_status_vector(chunk)

%StatusVector{} ->
chunk
|> status_vector_to_run_length()
|> Enum.map_join(&encode_run_length/1)
end
end)
:not_received ->
Membrane.Logger.warn("Reporting a non-received packet")
<<>>
end
end

defp encode_run_length(%RunLength{packet_status: status, packet_count: count}),
do: <<@run_length_id::1, @packet_status_code[status]::2, count::13>>
defp encode_packet_status_chunk(%RunLength{packet_status: status, packet_count: packet_count}),
do: <<@run_length_id::1, BiMap.fetch!(@packet_status_flags, status)::2, packet_count::13>>

defp encode_status_vector(%StatusVector{vector: vector, packet_count: @status_vector_capacity}) do
defp encode_packet_status_chunk(%StatusVector{vector: vector, packet_count: packet_count}) do
symbol_list =
Enum.reduce(vector, <<>>, fn status, acc ->
<<acc::bitstring, @packet_status_code[status]::2>>
<<acc::bitstring, BiMap.fetch!(@packet_status_flags, status)::2>>
end)

<<(<<@status_vector_id::1, @status_vector_symbol_2_bit_id::1>>)::bitstring,
symbol_list::bitstring>>
# in current implementation we use only 2-bit symbols for encoding, so padding
# size for an incomplete vector is always 2*(number of unfilled slots) bits
padding_size = 2 * (@status_vector_capacity - packet_count)

<<@status_vector_id::1, @status_vector_symbol_2_bit_id::1, symbol_list::bitstring,
0::size(padding_size)>>
end

defp run_length_to_status_vector(%RunLength{packet_status: status, packet_count: count}),
do: %StatusVector{vector: Enum.map(1..count, fn _i -> status end), packet_count: count}

defp status_vector_to_run_length(%StatusVector{vector: vector}) do
vector
|> Enum.reduce([], fn status, acc ->
case acc do
[%RunLength{packet_status: ^status, packet_count: count} | rest] ->
[%RunLength{packet_status: status, packet_count: count + 1} | rest]

_empty_acc_or_other_status ->
[%RunLength{packet_status: status, packet_count: 1} | acc]
end
end)
|> Enum.reverse()
end
do: %StatusVector{vector: List.duplicate(status, count), packet_count: count}

defp scale_delta(:not_received), do: :not_received

Expand All @@ -215,11 +293,11 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do
end

defp maybe_add_padding(payload) do
bits_remaining = rem(bit_size(payload), 32)
bytes_remaining = rem(byte_size(payload), 4)

if bits_remaining > 0 do
padding_size = 32 - bits_remaining
<<payload::bitstring, 0::size(padding_size)>>
if bytes_remaining > 0 do
padding_size = 4 - bytes_remaining
<<payload::binary, 0::size(padding_size)-unit(8)>>
else
payload
end
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/rtp/packet.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ defmodule Membrane.RTP.Packet do
| {:error, :wrong_version | :malformed_packet}
def parse(packet, encrypted?)

def parse(<<version::2, _::bitstring>>, _encrypted?) when version != 2,
def parse(<<version::2, _payload::bitstring>>, _encrypted?) when version != 2,
do: {:error, :wrong_version}

def parse(
Expand Down
Loading

0 comments on commit 5c1d7af

Please sign in to comment.