Skip to content

Commit

Permalink
[RTC-9] Support padding packets (#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-jodlos authored Nov 30, 2022
1 parent 4d2e8dd commit 189ef49
Show file tree
Hide file tree
Showing 17 changed files with 128 additions and 81 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The package can be installed by adding `membrane_rtp_plugin` to your list of dep
```elixir
def deps do
[
{:membrane_rtp_plugin, "~> 0.16.0"}
{:membrane_rtp_plugin, "~> 0.17.0"}
{:ex_libsrtp, "~> 0.3.0"} # required only if SRTP/SRTCP support is needed
]
end
Expand Down
12 changes: 1 addition & 11 deletions lib/membrane/rtp/header_generator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,7 @@ defmodule Membrane.RTP.HeaderGenerator do

def_options ssrc: [spec: RTP.ssrc_t()],
payload_type: [spec: RTP.payload_type_t()],
clock_rate: [spec: RTP.clock_rate_t()],
alignment: [
default: 1,
spec: pos_integer(),
description: """
Number of bytes that each packet should be aligned to.
Alignment is achieved by adding RTP padding.
"""
]
clock_rate: [spec: RTP.clock_rate_t()]

defmodule State do
@moduledoc false
Expand All @@ -39,7 +31,6 @@ defmodule Membrane.RTP.HeaderGenerator do
:ssrc,
:payload_type,
:clock_rate,
:alignment,
sequence_number: 0,
init_timestamp: 0
]
Expand All @@ -48,7 +39,6 @@ defmodule Membrane.RTP.HeaderGenerator do
ssrc: RTP.ssrc_t(),
payload_type: RTP.payload_type_t(),
clock_rate: RTP.clock_rate_t(),
alignment: pos_integer(),
sequence_number: non_neg_integer(),
init_timestamp: non_neg_integer()
}
Expand Down
16 changes: 4 additions & 12 deletions lib/membrane/rtp/outbound_tracking_serializer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,7 @@ defmodule Membrane.RTP.OutboundTrackingSerializer do
def_options ssrc: [spec: RTP.ssrc_t()],
payload_type: [spec: RTP.payload_type_t()],
clock_rate: [spec: RTP.clock_rate_t()],
extension_mapping: [spec: RTP.SessionBin.rtp_extension_mapping_t()],
alignment: [
default: 1,
spec: pos_integer(),
description: """
Number of bytes that each packet should be aligned to.
Alignment is achieved by adding RTP padding.
"""
]
extension_mapping: [spec: RTP.SessionBin.rtp_extension_mapping_t()]

defmodule State do
@moduledoc false
Expand All @@ -52,7 +44,6 @@ defmodule Membrane.RTP.OutboundTrackingSerializer do
ssrc: RTP.ssrc_t(),
payload_type: RTP.payload_type_t(),
extension_mapping: RTP.SessionBin.rtp_extension_mapping_t(),
alignment: pos_integer(),
any_buffer_sent?: boolean(),
rtcp_output_pad: Membrane.Pad.ref_t() | nil,
stats_acc: %{}
Expand All @@ -61,7 +52,6 @@ defmodule Membrane.RTP.OutboundTrackingSerializer do
defstruct ssrc: 0,
payload_type: 0,
extension_mapping: %{},
alignment: 1,
any_buffer_sent?: false,
rtcp_output_pad: nil,
stats_acc: %{
Expand Down Expand Up @@ -171,9 +161,11 @@ defmodule Membrane.RTP.OutboundTrackingSerializer do
extensions: extensions
})

padding_size = Map.get(rtp_metadata, :padding_size, 0)

payload =
RTP.Packet.serialize(%RTP.Packet{header: header, payload: buffer.payload},
align_to: state.alignment
padding_size: padding_size
)

buffer = %Buffer{buffer | payload: payload, metadata: metadata}
Expand Down
37 changes: 19 additions & 18 deletions lib/membrane/rtp/packet.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ defmodule Membrane.RTP.Packet do
payload: binary()
}

@typedoc """
Possible padding size.
It includes the last byte denoting the size of the padding.
"""
@type padding_size :: 0..255

@enforce_keys [:header, :payload]
defstruct @enforce_keys

Expand All @@ -27,28 +34,21 @@ defmodule Membrane.RTP.Packet do

def identify(_packet), do: :rtp

@spec serialize(t, align_to: pos_integer()) :: binary
def serialize(%__MODULE__{} = packet, [align_to: align_to] \\ [align_to: 1]) do
@spec serialize(t, padding_size: padding_size()) :: binary
def serialize(%__MODULE__{} = packet, opts \\ []) do
%__MODULE__{header: header, payload: payload} = packet
%Header{version: 2} = header
has_padding = 0
padding_size = Keyword.get(opts, :padding_size, 0)
has_padding = if padding_size > 0, do: 1, else: 0
has_extension = if header.extensions == [], do: 0, else: 1
marker = if header.marker, do: 1, else: 0
csrcs = Enum.map_join(header.csrcs, &<<&1::32>>)
padding = Utils.generate_padding(padding_size)

serialized =
<<header.version::2, has_padding::1, has_extension::1, length(header.csrcs)::4, marker::1,
header.payload_type::7, header.sequence_number::16, header.timestamp::32, header.ssrc::32,
csrcs::binary, serialize_header_extensions(header.extensions)::binary, payload::binary>>

case Utils.align(serialized, align_to) do
{serialized, 0} ->
serialized

{serialized, _padding} ->
<<pre::2, _has_padding::1, post::bitstring>> = serialized
<<pre::2, 1::1, post::bitstring>>
end
<<header.version::2, has_padding::1, has_extension::1, length(header.csrcs)::4, marker::1,
header.payload_type::7, header.sequence_number::16, header.timestamp::32, header.ssrc::32,
csrcs::binary, serialize_header_extensions(header.extensions)::binary, payload::binary,
padding::binary>>
end

defp serialize_header_extensions([]), do: <<>>
Expand Down Expand Up @@ -84,7 +84,8 @@ defmodule Membrane.RTP.Packet do
end

@spec parse(binary(), boolean()) ::
{:ok, %{packet: t(), has_padding?: boolean(), total_header_size: non_neg_integer()}}
{:ok,
%{packet: t(), padding_size: padding_size(), total_header_size: non_neg_integer()}}
| {:error, :wrong_version | :malformed_packet}
def parse(packet, encrypted?)

Expand Down Expand Up @@ -118,7 +119,7 @@ defmodule Membrane.RTP.Packet do
header: header,
payload: if(encrypted?, do: original_packet, else: payload)
},
has_padding?: has_padding == 1,
padding_size: padding,
total_header_size: byte_size(original_packet) - byte_size(payload) - padding
}}
else
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane/rtp/parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ defmodule Membrane.RTP.Parser do
def handle_process(:input, %Buffer{payload: payload, metadata: metadata} = buffer, _ctx, state) do
with :rtp <- RTP.Packet.identify(payload),
{:ok,
%{packet: packet, has_padding?: has_padding?, total_header_size: total_header_size}} <-
%{packet: packet, padding_size: padding_size, total_header_size: total_header_size}} <-
RTP.Packet.parse(payload, state.secure?) do
%RTP.Packet{payload: payload, header: header} = packet

rtp =
header
|> Map.take(@metadata_fields)
|> Map.merge(%{has_padding?: has_padding?, total_header_size: total_header_size})
|> Map.merge(%{padding_size: padding_size, total_header_size: total_header_size})

metadata = Map.put(metadata, :rtp, rtp)
{{:ok, buffer: {:output, %Buffer{payload: payload, metadata: metadata}}}, state}
Expand Down
13 changes: 2 additions & 11 deletions lib/membrane/rtp/payloader_bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,7 @@ defmodule Membrane.RTP.PayloaderBin do
],
ssrc: [spec: RTP.ssrc_t()],
payload_type: [spec: RTP.payload_type_t()],
clock_rate: [spec: RTP.clock_rate_t()],
alignment: [
default: 1,
spec: pos_integer(),
description: """
Number of bytes that each packet should be aligned to.
Alignment is achieved by adding RTP padding.
"""
]
clock_rate: [spec: RTP.clock_rate_t()]

@impl true
def handle_init(opts) do
Expand All @@ -35,8 +27,7 @@ defmodule Membrane.RTP.PayloaderBin do
|> to(:header_generator, %RTP.HeaderGenerator{
ssrc: opts.ssrc,
payload_type: opts.payload_type,
clock_rate: opts.clock_rate,
alignment: opts.alignment
clock_rate: opts.clock_rate
})
|> to_bin_output()
]
Expand Down
18 changes: 18 additions & 0 deletions lib/membrane/rtp/session_bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ defmodule Membrane.RTP.SessionBin do
In such a case, SessionBin will receive payloaded packets and work as a simple proxy just forwarding the packets (and decrypting them if necessary).
Therefore it is possible to specify in newly added pads if payloaders/depayloaders should be used for the certain stream.
## Padding
Addition and removal of a padding from RTP packets is handled by the RTP plugin.
To send a packet with a padding, one should include `padding_size` field in `Membrane.Buffer`'s metadata.
E.g.
```elixir
metadata = %{rtp: %{padding_size: 20}}
```
will result in adding 20 bytes of padding at the end of packet's payload.
When parsing an RTP stream, a padding is stripped out and the `padding_size` field is set appropriately to the
number of bytes that were removed.
For more information, please refer to the [RFC 3550, sec. 5](https://www.rfc-editor.org/rfc/rfc3550#section-5)
## RTCP
RTCP packets for inbound stream can be provided either in-band or via a separate `rtp_input` pad instance. Corresponding
receiver report packets will be sent back through `rtcp_receiver_output` with the same id as `rtp_input` for the RTP stream.
Expand Down
12 changes: 11 additions & 1 deletion lib/membrane/rtp/twcc_sender.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,20 @@ defmodule Membrane.RTP.TWCCSender do
buffer =
Header.Extension.put(buffer, %Header.Extension{identifier: :twcc, data: <<seq_num::16>>})

# TODO take into account header size
# we are not taking into account header
# size here which doesn't seem to be
# fully correct
padding_size = Map.get(buffer.metadata.rtp, :padding_size, 0)

unless padding_size in 0..255, do: raise("padding_size has to be in 0..255")

overall_payload_size = bit_size(buffer.payload) + padding_size * 8

state =
state
|> put_in([:seq_to_timestamp, seq_num], Time.vm_time())
|> put_in([:seq_to_size, seq_num], bit_size(buffer.payload))
|> put_in([:seq_to_size, seq_num], overall_payload_size)

out_pad = Pad.ref(:output, id)
[buffer: {out_pad, buffer}] |> send_when_pad_connected(out_pad, ctx, state)
Expand Down
17 changes: 7 additions & 10 deletions lib/membrane/rtp/utils.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule Membrane.RTP.Utils do
@moduledoc false

alias Membrane.RTP.Packet

@spec strip_padding(binary, padding_present? :: boolean) ::
{:ok, {binary, padding_size :: non_neg_integer()}} | :error
def strip_padding(binary, padding_present?)
Expand All @@ -17,17 +19,12 @@ defmodule Membrane.RTP.Utils do
end
end

@spec align(payload :: binary, align_to :: pos_integer()) ::
{binary, padding_size :: non_neg_integer()}
def align(payload, align_to) do
case rem(byte_size(payload), align_to) do
0 ->
{payload, 0}
@spec generate_padding(Packet.padding_size()) :: binary()
def generate_padding(0), do: <<>>

padding_size ->
zeros_no = padding_size - 1
{<<payload::binary, 0::size(zeros_no)-unit(8), padding_size>>, padding_size}
end
def generate_padding(padding_size) when padding_size in 1..255 do
zeros_no = padding_size - 1
<<0::size(zeros_no)-unit(8), padding_size>>
end

@spec from_which_rollover(number() | nil, number(), number()) :: :current | :previous | :next
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane/srtp/decryptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ if Code.ensure_loaded?(ExLibSRTP) do
payload: payload,
metadata: %{
rtp: %{
has_padding?: has_padding?,
padding_size: padding_size,
total_header_size: total_header_size
}
}
Expand All @@ -104,7 +104,7 @@ if Code.ensure_loaded?(ExLibSRTP) do
# decrypted payload contains the header that we can simply strip without any parsing as we know its length
<<_header::binary-size(total_header_size), payload::binary>> = payload

{:ok, {payload, _size}} = Utils.strip_padding(payload, has_padding?)
{:ok, {payload, _size}} = Utils.strip_padding(payload, padding_size > 0)

{{:ok, buffer: {:output, %Buffer{buffer | payload: payload}}}, state}

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.RTP.Plugin.MixProject do
use Mix.Project

@version "0.16.0"
@version "0.17.0"
@github_url "https://github.com/membraneframework/membrane_rtp_plugin"

def project do
Expand Down
Binary file added test/fixtures/rtp/rtp_packet_with_padding.bin
Binary file not shown.
20 changes: 10 additions & 10 deletions test/membrane/rtp/packet_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,18 @@ defmodule Membrane.RTP.PacketTest do
assert Packet.serialize(packet) == packet_binary
end

test "ignores padding" do
test_padding_size = 2
padding_octets = test_padding_size - 1
test_padding = <<0::size(padding_octets)-unit(8), test_padding_size>>
<<version::2, _padding::1, rest::bitstring>> = Fixtures.sample_packet_binary()
test_packet = <<version::2, 1::1, rest::bitstring, test_padding::binary>>

sample_packet = Fixtures.sample_packet()
test "generates padding" do
ref_packet = Fixtures.sample_packet_binary_with_padding()
test_packet = Fixtures.sample_packet()
assert ref_packet == Packet.serialize(test_packet, padding_size: 20)
end

assert {:ok, %{packet: ^sample_packet}} = Packet.parse(test_packet, @encrypted?)
test "ignores padding" do
ref_packet = Fixtures.sample_packet()
test_packet = Fixtures.sample_packet_binary_with_padding()

assert Packet.serialize(Fixtures.sample_packet(), align_to: 4) == test_packet
assert {:ok, %{packet: ^ref_packet, padding_size: 20}} =
Packet.parse(test_packet, @encrypted?)
end

test "reads and serializes extension header" do
Expand Down
25 changes: 24 additions & 1 deletion test/membrane/rtp/parser_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,30 @@ defmodule Membrane.RTP.ParserTest do
csrcs: [],
extensions: [],
marker: false,
has_padding?: false,
padding_size: 0,
total_header_size: 12
}
},
payload: Fixtures.sample_packet_payload()
}}}, state}

packet = Fixtures.sample_packet_binary_with_padding()

assert Parser.handle_process(:input, %Buffer{payload: packet}, nil, state) ==
{{:ok,
buffer:
{:output,
%Membrane.Buffer{
metadata: %{
rtp: %{
sequence_number: 3983,
timestamp: 1_653_702_647,
payload_type: 14,
ssrc: 3_919_876_492,
csrcs: [],
extensions: [],
marker: false,
padding_size: 20,
total_header_size: 12
}
},
Expand Down
14 changes: 14 additions & 0 deletions test/membrane/rtp/utils_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule Membrane.RTP.UtilsTest do
use ExUnit.Case, async: true

alias Membrane.RTP.Utils

test "Utils.generate_padding/1" do
assert <<>> = Utils.generate_padding(0)
assert <<1>> = Utils.generate_padding(1)
assert <<0::size(99)-unit(8), 100>> = Utils.generate_padding(100)
assert <<0::size(254)-unit(8), 255>> = Utils.generate_padding(255)
assert_raise FunctionClauseError, fn -> Utils.generate_padding(-1) end
assert_raise FunctionClauseError, fn -> Utils.generate_padding(256) end
end
end
2 changes: 1 addition & 1 deletion test/membrane/rtp/vad_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ defmodule Membrane.RTP.VADTest do
data: data
}
],
has_padding?: false,
padding_size: 0,
marker: false,
payload_type: 111,
sequence_number: 16_503,
Expand Down
Loading

0 comments on commit 189ef49

Please sign in to comment.