Skip to content

Commit

Permalink
Merge pull request #82 from membraneframework/handle-extension-ids
Browse files Browse the repository at this point in the history
Add support for handling multiple extension ids
  • Loading branch information
balins authored Nov 30, 2021
2 parents d2114e0 + c015ee0 commit 1a524bf
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 48 deletions.
34 changes: 33 additions & 1 deletion lib/membrane/rtp/header_extension.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,43 @@ defmodule Membrane.RTP.Header.Extension do
```
"""
alias Membrane.Buffer
alias Membrane.RTP.SessionBin

@enforce_keys [:identifier, :data]
defstruct @enforce_keys

@type identifier_t :: 1..14 | SessionBin.rtp_extension_name_t()

@type t :: %__MODULE__{
identifier: 1..14,
identifier: identifier_t(),
data: binary()
}

@spec pop(Buffer.t(), identifier_t()) :: {t() | nil, Buffer.t()}
def pop(buffer, identifier) do
extension = Enum.find(buffer.metadata.rtp.extensions, &(&1.identifier == identifier))

if extension do
buffer =
Bunch.Struct.update_in(
buffer,
[:metadata, :rtp, :extensions],
&delete(&1, identifier)
)

{extension, buffer}
else
{nil, buffer}
end
end

@spec put(Buffer.t(), t()) :: Buffer.t()
def put(buffer, extension) do
Bunch.Struct.update_in(buffer, [:metadata, :rtp, :extensions], &[extension | &1])
end

defp delete(extensions, identifier) do
Enum.reject(extensions, &(&1.identifier == identifier))
end
end
22 changes: 19 additions & 3 deletions lib/membrane/rtp/outbound_packet_tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ defmodule Membrane.RTP.OutboundPacketTracker do
@moduledoc """
Tracks statistics of outband packets.
Besides tracking statistics, tracker can also serialize packet's header and payload stored inside an incoming buffer into
a a proper RTP packet.
Besides tracking statistics, tracker can also serialize packet's header and payload stored inside an incoming buffer
into a proper RTP packet. When encountering header extensions, it remaps its identifiers from locally used extension
names to integer values expected by the receiver.
"""
use Membrane.Filter

Expand All @@ -19,6 +20,7 @@ defmodule Membrane.RTP.OutboundPacketTracker do
def_options ssrc: [spec: RTP.ssrc_t()],
payload_type: [spec: RTP.payload_type_t()],
clock_rate: [spec: RTP.clock_rate_t()],
extension_mapping: [spec: RTP.SessionBin.rtp_extension_mapping_t()],
alignment: [
default: 1,
spec: pos_integer(),
Expand Down Expand Up @@ -65,8 +67,22 @@ defmodule Membrane.RTP.OutboundPacketTracker do

{rtp_metadata, metadata} = Map.pop(buffer.metadata, :rtp, %{})

supported_extensions = Map.keys(state.extension_mapping)

extensions =
rtp_metadata.extensions
|> Enum.filter(fn extension -> extension.identifier in supported_extensions end)
|> Enum.map(fn extension ->
%{extension | identifier: Map.fetch!(state.extension_mapping, extension.identifier)}
end)

header =
struct(RTP.Header, %{rtp_metadata | ssrc: state.ssrc, payload_type: state.payload_type})
struct(RTP.Header, %{
rtp_metadata
| ssrc: state.ssrc,
payload_type: state.payload_type,
extensions: extensions
})

payload =
RTP.Packet.serialize(%RTP.Packet{header: header, payload: buffer.payload},
Expand Down
83 changes: 52 additions & 31 deletions lib/membrane/rtp/session_bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,42 @@ defmodule Membrane.RTP.SessionBin do
@type new_stream_notification_t :: Membrane.RTP.SSRCRouter.new_stream_notification_t()

@typedoc """
A module that will be spawned and linked just before a newly created `:output` pad representing
a single RTP stream.
An atom that identifies an RTP extension in the bin. It will be used by the module implementing it
to mark its header extension under `Membrane.RTP.Header.Extension`'s `identifier` key.
"""
@type rtp_extension_name_t :: atom()

@typedoc """
A module representing an RTP extension that will be spawned and linked just before a newly created
`:output` pad representing a single RTP stream.
Given extension config must be a valid `Membrane.Filter`.
An extension will be spawned inside the bin under `{extension_name :: atom(), ssrc}` name.
An extension will be spawned inside the bin under `{extension_name, ssrc}` name.
### Currently supported extensions are:
### Currently supported RTP extensions are:
* `Membrane.RTP.VAD`
### Example usage
`{:vad, %Mebrane.RTP.VAD{time_window: 1_000_000}}`
`{:vad, %Mebrane.RTP.VAD{vad_id: 1, time_window: 1_000_000}}`
"""
@type rtp_extension_options_t ::
{extension_name :: rtp_extension_name_t(),
extension_config :: Membrane.ParentSpec.child_spec_t()}

@typedoc """
A mapping between internally used `rtp_extension_name_t()` and extension identifiers expected by RTP stream receiver.
"""
@type extension_t ::
{extension_name :: atom(), extension_config :: Membrane.ParentSpec.child_spec_t()}
@type rtp_extension_mapping_t :: %{rtp_extension_name_t() => 1..14}

@typedoc """
A definition of a child that will be responsible for arbitrary packet filtering
inside `Membrane.RTP.StreamReceiveBin`. Each filter should have just a single input and output
pad named accordingly.
A definition of a general extension inside `Membrane.RTP.StreamReceiveBin`. Each extension should
have just a single input and output pad named accordingly.
A filter can be responsible e.g. for dropping silent audio packets when encountered VAD extension data in the
packet header.
Extensions can implement different functionalities, for example a filter can be responsible for dropping silent
audio packets when encountered VAD extension data in header extensions of a packet.
"""
@type packet_filter_t :: {Membrane.Child.name_t(), Membrane.ParentSpec.child_spec_t()}
@type extension_t :: {Membrane.Child.name_t(), Membrane.ParentSpec.child_spec_t()}

@ssrc_boundaries 2..(Bitwise.bsl(1, 32) - 1)

Expand Down Expand Up @@ -181,26 +192,26 @@ defmodule Membrane.RTP.SessionBin do
`Membrane.RTP.X.Plugin` where X is the name of codec corresponding to `encoding`.
"""
],
extensions: [
spec: [extension_t()],
rtp_extensions: [
spec: [rtp_extension_options_t()],
default: [],
description: """
List of extensions. Currently `:vad` is only supported.
List of RTP extension options. Currently only `:vad` is supported.
* `:vad` will turn on Voice Activity Detection mechanism firing appropriate notifications when needed.
Should be set only for audio tracks. For more information refer to `Membrane.RTP.VAD` module documentation.
Extensions are applied in the same order as passed to the pad options.
RTP extensions are applied in the same order as passed to the pad options.
"""
],
packet_filters: [
spec: [packet_filter_t()],
extensions: [
spec: [extension_t()],
default: [],
description: """
A list of filter elements that will be attached to the packets flow (added inside `Membrane.RTP.StreamReceiveBin`).
In case of SRTP filters are placed before the Decryptor. The order of provided elements is important
as the filters are applied in FIFO order.
A list of general extensions that will be attached to the packets flow (added inside `Membrane.RTP.StreamReceiveBin`).
In case of SRTP extensions are placed before the Decryptor. The order of provided elements is important
as the extensions are applied in FIFO order.
A filter can be responsible e.g. for dropping silent audio packets when encountered VAD extension data in the
An extension can be responsible e.g. for dropping silent audio packets when encountered VAD extension data in the
packet header.
"""
],
Expand Down Expand Up @@ -236,6 +247,14 @@ defmodule Membrane.RTP.SessionBin do
description: """
Clock rate to use. If not provided, determined from `:payload_type`.
"""
],
rtp_extension_mapping: [
spec: rtp_extension_mapping_t(),
default: nil,
description: """
Mapping from locally used `rtp_extension_name_t()` to integer identifiers expected by
the receiver of a RTP stream.
"""
]
]

Expand Down Expand Up @@ -330,9 +349,9 @@ defmodule Membrane.RTP.SessionBin do
%{
depayloader: depayloader,
clock_rate: clock_rate,
extensions: extensions,
rtp_extensions: rtp_extensions,
rtcp_fir_interval: fir_interval,
packet_filters: filters
extensions: extensions
} = ctx.pads[pad].options

payload_type = Map.fetch!(state.ssrc_pt_mapping, ssrc)
Expand All @@ -346,7 +365,7 @@ defmodule Membrane.RTP.SessionBin do
rtp_stream_name => %RTP.StreamReceiveBin{
clock_rate: clock_rate,
depayloader: depayloader,
filters: filters,
extensions: extensions,
local_ssrc: local_ssrc,
remote_ssrc: ssrc,
rtcp_fir_interval: fir_interval,
Expand All @@ -364,7 +383,7 @@ defmodule Membrane.RTP.SessionBin do
acc = {new_children, router_link}

{new_children, router_link} =
extensions
rtp_extensions
|> Enum.reduce(acc, fn {extension_name, config}, {new_children, new_link} ->
extension_id = {extension_name, ssrc}

Expand All @@ -376,8 +395,7 @@ defmodule Membrane.RTP.SessionBin do

new_links = [router_link |> to_bin_output(pad)]

new_spec = %ParentSpec{children: new_children, links: new_links}
{{:ok, spec: new_spec}, state}
{{:ok, spec: %ParentSpec{children: new_children, links: new_links}}, state}
end

@impl true
Expand All @@ -402,7 +420,9 @@ defmodule Membrane.RTP.SessionBin do
{:ok, state}
else
%{payloader: payloader} = ctx.pads[input_pad].options
%{clock_rate: clock_rate} = ctx.pads[output_pad].options

%{clock_rate: clock_rate, rtp_extension_mapping: rtp_extension_mapping} =
ctx.pads[output_pad].options

payload_type = get_output_payload_type!(ctx, ssrc)
clock_rate = clock_rate || get_from_register!(:clock_rate, payload_type, state)
Expand All @@ -416,7 +436,8 @@ defmodule Membrane.RTP.SessionBin do
ssrc: ssrc,
payload_type: payload_type,
payloader: payloader,
clock_rate: clock_rate
clock_rate: clock_rate,
rtp_extension_mapping: rtp_extension_mapping || %{}
})
|> then(if state.secure?, do: maybe_link_encryptor, else: & &1)
|> to_bin_output(output_pad)
Expand Down
12 changes: 6 additions & 6 deletions lib/membrane/rtp/stream_receive_bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ defmodule Membrane.RTP.StreamReceiveBin do
type: :boolean,
default: false
],
filters: [
spec: [Membrane.RTP.SessionBin.packet_filter_t()],
extensions: [
spec: [Membrane.RTP.SessionBin.extension_t()],
default: []
],
clock_rate: [
Expand Down Expand Up @@ -48,7 +48,7 @@ defmodule Membrane.RTP.StreamReceiveBin do

links = [
link_bin_input()
|> to_filters(opts.filters)
|> to_extensions(opts.extensions)
|> to(:rtcp_receiver, %Membrane.RTCP.Receiver{
local_ssrc: opts.local_ssrc,
remote_ssrc: opts.remote_ssrc,
Expand All @@ -71,9 +71,9 @@ defmodule Membrane.RTP.StreamReceiveBin do
{{:ok, spec: spec}, %{}}
end

defp to_filters(link_builder, filters) do
Enum.reduce(filters, link_builder, fn {filter_name, filter}, builder ->
builder |> to(filter_name, filter)
defp to_extensions(link_builder, extensions) do
Enum.reduce(extensions, link_builder, fn {extension_name, extension}, builder ->
builder |> to(extension_name, extension)
end)
end
end
6 changes: 4 additions & 2 deletions lib/membrane/rtp/stream_send_bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ defmodule Membrane.RTP.StreamSendBin do
def_options payloader: [default: nil, spec: module],
payload_type: [spec: RTP.payload_type_t()],
ssrc: [spec: RTP.ssrc_t()],
clock_rate: [spec: RTP.clock_rate_t()]
clock_rate: [spec: RTP.clock_rate_t()],
rtp_extension_mapping: [spec: RTP.SessionBin.rtp_extension_mapping_t()]

@impl true
def handle_init(opts) do
Expand All @@ -32,7 +33,8 @@ defmodule Membrane.RTP.StreamSendBin do
|> to(:packet_tracker, %RTP.OutboundPacketTracker{
ssrc: opts.ssrc,
payload_type: opts.payload_type,
clock_rate: opts.clock_rate
clock_rate: opts.clock_rate,
extension_mapping: opts.rtp_extension_mapping || %{}
})
|> to_bin_output()
]
Expand Down
26 changes: 23 additions & 3 deletions lib/membrane/rtp/vad.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ defmodule Membrane.RTP.VAD do
"""
use Membrane.Filter

alias Membrane.RTP.Header

def_input_pad :input,
availability: :always,
caps: :any,
Expand All @@ -34,7 +36,11 @@ defmodule Membrane.RTP.VAD do
availability: :always,
caps: :any

def_options clock_rate: [
def_options vad_id: [
spec: 1..14,
description: "ID of VAD header extension."
],
clock_rate: [
spec: Membrane.RTP.clock_rate_t(),
default: 48_000,
description: "Clock rate (in `Hz`) for the encoding."
Expand Down Expand Up @@ -84,6 +90,7 @@ defmodule Membrane.RTP.VAD do
@impl true
def handle_init(opts) do
state = %{
vad_id: opts.vad_id,
audio_levels: Qex.new(),
clock_rate: opts.clock_rate,
vad: :silence,
Expand All @@ -108,8 +115,21 @@ defmodule Membrane.RTP.VAD do

@impl true
def handle_process(:input, %Membrane.Buffer{} = buffer, _ctx, state) do
vad_extension = Enum.find(buffer.metadata.rtp.extensions, &(&1.identifier == 1))
<<_v::1, level::7>> = vad_extension.data
{extension, buffer} = Header.Extension.pop(buffer, state.vad_id)
handle_if_present(buffer, extension, state)
end

defp handle_if_present(buffer, nil, state), do: {{:ok, buffer: {:output, buffer}}, state}

defp handle_if_present(buffer, extension, state) do
<<_v::1, level::7>> = extension.data

new_extension = %Header.Extension{
identifier: :vad,
data: extension.data
}

buffer = Header.Extension.put(buffer, new_extension)

rtp_timestamp = buffer.metadata.rtp.timestamp
epoch = timestamp_epoch(state.current_timestamp, rtp_timestamp)
Expand Down
6 changes: 4 additions & 2 deletions test/membrane/rtp/vad_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ defmodule Membrane.RTP.VADTest do
ExUnit.Case.register_attribute(__MODULE__, :vad_silence_time)
ExUnit.Case.register_attribute(__MODULE__, :vad_threshold)

@default_vad_id 1
@default_buffer_interval 20

defp calculate_buffer_time_delta(ctx) do
Expand Down Expand Up @@ -50,6 +51,7 @@ defmodule Membrane.RTP.VADTest do
defp setup_initial_vad_state(ctx) do
{:ok, state} =
VAD.handle_init(%{
vad_id: @default_vad_id,
clock_rate: ctx.clock_rate,
min_packet_num: ctx.min_packet_num,
time_window: ctx.time_window,
Expand All @@ -68,8 +70,8 @@ defmodule Membrane.RTP.VADTest do
csrcs: [],
extensions: [
%Membrane.RTP.Header.Extension{
data: data,
identifier: 1
identifier: @default_vad_id,
data: data
}
],
has_padding?: false,
Expand Down

0 comments on commit 1a524bf

Please sign in to comment.