Skip to content

Commit

Permalink
Merge pull request #107 from membraneframework/feat/fir-pli-handling
Browse files Browse the repository at this point in the history
Feat/fir pli handling
  • Loading branch information
bblaszkow06 authored Jul 22, 2022
2 parents 6a6bbd6 + 88d517f commit 4845a2b
Show file tree
Hide file tree
Showing 14 changed files with 296 additions and 77 deletions.
3 changes: 3 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import Config

config :membrane_rtp_plugin, :fir_throttle_duration_ms, 500
17 changes: 10 additions & 7 deletions lib/membrane/rtcp/feedback_packet/afb.ex
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
defmodule Membrane.RTCP.FeedbackPacket.AFB do
@moduledoc """
TODO: Mock module, for now ignores PSFB=206 & PT=15 packets.
Should encode and decode [Application Layer Feedback](https://datatracker.ietf.org/doc/html/rfc4585#section-6.4) packets.
[Application Layer Feedback](https://datatracker.ietf.org/doc/html/rfc4585#section-6.4) packets.
They use PT=PSFB (206) & FMT=15.
Since the message must be handled at the application layer, the struct simply wraps a binary content of message
"""

@behaviour Membrane.RTCP.FeedbackPacket

defstruct []
@enforce_keys [:message]
defstruct @enforce_keys

@impl true
def decode(_binary) do
{:ok, %__MODULE__{}}
def decode(binary) do
{:ok, %__MODULE__{message: binary}}
end

@impl true
def encode(_packet) do
<<>>
def encode(%__MODULE__{message: message}) when is_binary(message) do
message
end
end
68 changes: 48 additions & 20 deletions lib/membrane/rtcp/parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ defmodule Membrane.RTCP.Parser do

@impl true
def handle_prepared_to_playing(_ctx, state) do
{{:ok,
caps: {:receiver_report_output, %RemoteStream{type: :packetized, content_format: RTCP}}},
state}
caps = %RemoteStream{type: :packetized, content_format: RTCP}
{{:ok, caps: {:receiver_report_output, caps}}, state}
end

@impl true
Expand All @@ -47,7 +46,7 @@ defmodule Membrane.RTCP.Parser do
{{:ok, actions}, state}

{:error, reason} ->
Membrane.Logger.debug("""
Membrane.Logger.warn("""
Couldn't parse rtcp packet:
#{inspect(payload, limit: :infinity)}
Reason: #{inspect(reason)}. Ignoring packet.
Expand All @@ -70,9 +69,10 @@ defmodule Membrane.RTCP.Parser do
Enum.flat_map(rtcp, &process_rtcp(&1, metadata))
end

defp process_rtcp(%RTCP.FeedbackPacket{payload: %RTCP.FeedbackPacket.PLI{}}, _metadata) do
Membrane.Logger.debug("Received packet loss indicator RTCP packet")
[]
defp process_rtcp(%RTCP.FeedbackPacket{payload: %keyframe_request{}} = packet, metadata)
when keyframe_request in [RTCP.FeedbackPacket.FIR, RTCP.FeedbackPacket.PLI] do
event = to_rtcp_event(packet, packet.target_ssrc, metadata)
[event: {:output, event}]
end

defp process_rtcp(
Expand All @@ -83,27 +83,55 @@ defmodule Membrane.RTCP.Parser do
end

defp process_rtcp(%RTCP.SenderReportPacket{ssrc: ssrc} = packet, metadata) do
event = %RTCPEvent{
rtcp: packet,
ssrcs: [ssrc],
arrival_timestamp: Map.get(metadata, :arrival_ts, Membrane.Time.vm_time())
}

event = to_rtcp_event(packet, ssrc, metadata)
[event: {:output, event}]
end

defp process_rtcp(%RTCP.ReceiverReportPacket{reports: reports}, metadata) do
reports
|> Enum.map(fn report ->
event = %RTCPEvent{
rtcp: report,
ssrcs: [report.ssrc],
arrival_timestamp: Map.get(metadata, :arrival_ts, Membrane.Time.vm_time())
}

event = to_rtcp_event(report, report.ssrc, metadata)
{:event, {:output, event}}
end)
end

defp process_rtcp(_unknown_packet, _metadata), do: []
defp process_rtcp(
%RTCP.FeedbackPacket{
payload: %RTCP.FeedbackPacket.AFB{message: "REMB" <> _remb_data}
},
_metadata
) do
# maybe TODO: handle REMB extension
# Even though we do not support REMB and do not advertise such support in SDP,
# browsers ignore that and send REMB packets for video as part of sender report ¯\_(ツ)_/¯
[]
end

defp process_rtcp(%RTCP.ByePacket{ssrcs: ssrcs}, _metadata) do
Membrane.Logger.debug("SSRCs #{inspect(ssrcs)} are leaving (received RTCP Bye)")
[]
end

defp process_rtcp(%RTCP.SdesPacket{}, _metadata) do
# We don't care about SdesPacket, usually included in compound packet with SenderReportPacket or ReceiverReportPacket
[]
end

defp process_rtcp(unknown_packet, metadata) do
Membrane.Logger.warn("""
Unhandled RTCP packet
#{inspect(unknown_packet, pretty: true, limit: :infinity)}
#{inspect(metadata, pretty: true)}
""")

[]
end

defp to_rtcp_event(rtcp_packet, ssrc, metadata) do
%RTCPEvent{
rtcp: rtcp_packet,
ssrcs: [ssrc],
arrival_timestamp: Map.get(metadata, :arrival_ts, Membrane.Time.vm_time())
}
end
end
64 changes: 38 additions & 26 deletions lib/membrane/rtcp/receiver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,47 +19,50 @@ defmodule Membrane.RTCP.Receiver do
def_options local_ssrc: [spec: RTP.ssrc_t()],
remote_ssrc: [spec: RTP.ssrc_t()],
report_interval: [spec: Membrane.Time.t() | nil, default: nil],
fir_interval: [spec: Membrane.Time.t() | nil, default: nil],
telemetry_label: [spec: Membrane.TelemetryMetrics.label(), default: []]

@event_name [Membrane.RTP, :rtcp, :fir, :sent]
@fir_throttle_duration Application.compile_env(
:membrane_rtp_plugin,
:fir_throttle_duration_ms,
500
)
|> Membrane.Time.milliseconds()

@fir_telemetry_event [Membrane.RTP, :rtcp, :fir, :sent]

@impl true
def handle_init(opts) do
Membrane.TelemetryMetrics.register(@event_name, opts.telemetry_label)
{:ok, Map.from_struct(opts) |> Map.merge(%{fir_seq_num: 0, sr_info: %{}})}
Membrane.TelemetryMetrics.register(@fir_telemetry_event, opts.telemetry_label)

state =
opts
|> Map.from_struct()
|> Map.merge(%{fir_seq_num: 0, last_fir_timestamp: 0, sr_info: %{}})

{:ok, state}
end

@impl true
def handle_prepared_to_playing(_ctx, state) do
fir_timer =
if state.fir_interval, do: [start_timer: {:fir_timer, state.fir_interval}], else: []

report_timer =
if state.report_interval,
do: [start_timer: {:report_timer, state.report_interval}],
else: []

{{:ok, fir_timer ++ report_timer}, state}
{{:ok, report_timer}, state}
end

@impl true
def handle_playing_to_prepared(_ctx, state) do
fir_timer = if state.fir_interval, do: [stop_timer: :fir_timer], else: []
report_timer = if state.report_interval, do: [stop_timer: :report_timer], else: []
{{:ok, fir_timer ++ report_timer}, state}
{{:ok, report_timer}, state}
end

@impl true
def handle_tick(:report_timer, _ctx, state) do
{{:ok, event: {:output, %ReceiverReport.StatsRequestEvent{}}}, state}
end

@impl true
def handle_tick(:fir_timer, _ctx, state) do
send_fir(state)
end

@impl true
def handle_event(:input, %RTCPEvent{rtcp: %SenderReportPacket{} = rtcp} = event, _ctx, state) do
<<_wallclock_ts_upper_16_bits::16, wallclock_ts_middle_32_bits::32,
Expand All @@ -75,7 +78,8 @@ defmodule Membrane.RTCP.Receiver do
end

@impl true
def handle_event(:input, %RTCPEvent{}, _ctx, state) do
def handle_event(:input, %RTCPEvent{} = event, _ctx, state) do
Membrane.Logger.error("Unexpected RTCPEvent: #{inspect(event)}")
{:ok, state}
end

Expand Down Expand Up @@ -118,18 +122,26 @@ defmodule Membrane.RTCP.Receiver do
end

defp send_fir(state) do
rtcp = %FeedbackPacket{
origin_ssrc: state.local_ssrc,
payload: %FeedbackPacket.FIR{
target_ssrc: state.remote_ssrc,
seq_num: state.fir_seq_num
now = Time.vm_time()

if now - state.last_fir_timestamp > @fir_throttle_duration do
rtcp = %FeedbackPacket{
origin_ssrc: state.local_ssrc,
payload: %FeedbackPacket.FIR{
target_ssrc: state.remote_ssrc,
seq_num: state.fir_seq_num
}
}
}

Membrane.TelemetryMetrics.execute(@event_name, %{}, %{}, state.telemetry_label)
Membrane.TelemetryMetrics.execute(@fir_telemetry_event, %{}, %{}, state.telemetry_label)

event = %RTCPEvent{rtcp: rtcp}
state = Map.update!(state, :fir_seq_num, &(&1 + 1))
{{:ok, event: {:input, event}}, state}
event = %RTCPEvent{rtcp: rtcp}
state = %{state | fir_seq_num: state.fir_seq_num + 1, last_fir_timestamp: now}
Membrane.Logger.info("Sending FIR to #{state.remote_ssrc}")
{{:ok, event: {:input, event}}, state}
else
Membrane.Logger.debug("Not sending FIR to #{state.remote_ssrc} due to throttling")
{:ok, state}
end
end
end
7 changes: 3 additions & 4 deletions lib/membrane/rtcp/transport_feedback_packet/twcc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,12 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do
<<base_seq_num::16, packet_status_count::16, reference_time::signed-integer-size(24),
feedback_packet_count::8>>

encoded_packet_status_chunks =
Enum.map_join(packet_status_chunks, &encode_packet_status_chunk/1)
encoded_packet_status_chunks = Enum.map(packet_status_chunks, &encode_packet_status_chunk/1)

encoded_receive_deltas = Enum.map_join(scaled_receive_deltas, &encode_receive_delta/1)
encoded_receive_deltas = Enum.map(scaled_receive_deltas, &encode_receive_delta/1)

[encoded_header, encoded_packet_status_chunks, encoded_receive_deltas]
|> Enum.join()
|> IO.iodata_to_binary()
|> maybe_add_padding()
end

Expand Down
24 changes: 23 additions & 1 deletion lib/membrane/rtp/outbound_packet_tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ defmodule Membrane.RTP.OutboundPacketTracker do
"""
use Membrane.Filter

alias Membrane.{Buffer, RTP, Payload, Time}
alias Membrane.{Buffer, RTCPEvent, RTP, Payload, Time}
alias Membrane.RTCP.FeedbackPacket.{PLI, FIR}
alias Membrane.RTP.Session.SenderReport

require Membrane.Logger

def_input_pad :input, caps: :any, demand_mode: :auto

def_output_pad :output, caps: :any, demand_mode: :auto
Expand Down Expand Up @@ -91,6 +94,25 @@ defmodule Membrane.RTP.OutboundPacketTracker do
raise "rtcp_output pad can get linked just once"
end

@impl true
def handle_event(
Pad.ref(:rtcp_input, _id),
%RTCPEvent{rtcp: %{payload: %keyframe_request{}}},
_ctx,
state
)
when keyframe_request in [PLI, FIR] do
# PLI or FIR reaching OutboundPacketTracker means the receiving peer sent it
# We need to pass it to the sending peer's RTCP.Receiver (in StreamReceiveBin) to get translated again into FIR/PLI with proper SSRCs
# and then sent to the sender. So the KeyframeRequestEvent, like salmon, starts an upstream journey here trying to reach that peer.
{{:ok, event: {:input, %Membrane.KeyframeRequestEvent{}}}, state}
end

@impl true
def handle_event(pad, event, ctx, state) do
super(pad, event, ctx, state)
end

@impl true
def handle_process(:input, %Buffer{} = buffer, _ctx, state) do
state = update_stats(buffer, state)
Expand Down
13 changes: 3 additions & 10 deletions lib/membrane/rtp/session_bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ defmodule Membrane.RTP.SessionBin do
## RTCP
RTCP packets for inbound stream can be provided either in-band or via a separate `rtp_input` pad instance. Corresponding
receiver report packets will be sent back through `rtcp_output` with the same id as `rtp_input` for the RTP stream.
receiver report packets will be sent back through `rtcp_receiver_output` with the same id as `rtp_input` for the RTP stream.
RTCP for outbound stream is not yet supported. # But will be :)
"""
Expand Down Expand Up @@ -239,11 +239,6 @@ defmodule Membrane.RTP.SessionBin do
An extension can be responsible e.g. for dropping silent audio packets when encountered VAD extension data in the
packet header.
"""
],
rtcp_fir_interval: [
spec: Membrane.Time.t() | nil,
default: Membrane.Time.second(),
description: "Interval between sending subseqent RTCP Full Intra Request packets."
]
]

Expand Down Expand Up @@ -391,7 +386,6 @@ defmodule Membrane.RTP.SessionBin do
depayloader: depayloader,
clock_rate: clock_rate,
rtp_extensions: rtp_extensions,
rtcp_fir_interval: fir_interval,
encoding: encoding,
telemetry_label: telemetry_label,
extensions: extensions
Expand All @@ -412,7 +406,6 @@ defmodule Membrane.RTP.SessionBin do
local_ssrc: local_ssrc,
remote_ssrc: ssrc,
rtcp_report_interval: state.rtcp_receiver_report_interval,
rtcp_fir_interval: fir_interval,
telemetry_label: telemetry_label,
secure?: state.secure?,
srtp_policies: state.srtp_policies
Expand Down Expand Up @@ -514,7 +507,7 @@ defmodule Membrane.RTP.SessionBin do
# if RTCP is present create all set of input and output pads for RTCP flow
rtcp_links =
if rtcp? do
maybe_link_srtcp_encryptor =
link_srtcp_encryptor =
&to(
&1,
{:srtcp_sender_encryptor, ssrc},
Expand All @@ -526,7 +519,7 @@ defmodule Membrane.RTP.SessionBin do
[
link({:stream_send_bin, ssrc})
|> via_out(:rtcp_output)
|> then(if state.secure?, do: maybe_link_srtcp_encryptor, else: & &1)
|> then(if state.secure?, do: link_srtcp_encryptor, else: & &1)
|> to_bin_output(rtcp_sender_output),
link(:ssrc_router)
|> via_out(Pad.ref(:output, ssrc))
Expand Down
15 changes: 13 additions & 2 deletions lib/membrane/rtp/ssrc_router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule Membrane.RTP.SSRCRouter do

alias Membrane.{RTCP, RTP, RTCPEvent, SRTP}

require Membrane.Logger
require Membrane.TelemetryMetrics

@packet_arrival_event [Membrane.RTP, :packet, :arrival]
Expand Down Expand Up @@ -138,8 +139,18 @@ defmodule Membrane.RTP.SSRCRouter do
def handle_event(Pad.ref(:input, _id), %RTCPEvent{} = event, ctx, state) do
actions =
event.ssrcs
|> Enum.map(&{:event, {Pad.ref(:output, &1), event}})
|> Enum.filter(fn {:event, {pad, _event}} -> Map.has_key?(ctx.pads, pad) end)
|> Enum.flat_map(fn ssrc ->
target_pad = Pad.ref(:output, ssrc)

if Map.has_key?(ctx.pads, target_pad) do
[event: {target_pad, event}]
else
# TODO: This should most likely be a warning, however it appears on every join and leave,
# So until it's fixed, it is reported with debug log level
Membrane.Logger.debug("Received event (#{inspect(event)}) for unknown SSRC: #{ssrc}")
[]
end
end)

{{:ok, actions}, state}
end
Expand Down
Loading

0 comments on commit 4845a2b

Please sign in to comment.