From fd2d857e609b00f752f2e94faa30db075a027bcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20B=C5=82aszk=C3=B3w?= Date: Thu, 30 Jun 2022 17:27:47 +0200 Subject: [PATCH 01/13] Parse PLI, FIR & AFB RTCP packets --- lib/membrane/rtcp/feedback_packet/afb.ex | 17 +++--- lib/membrane/rtcp/parser.ex | 66 +++++++++++++++++------- 2 files changed, 56 insertions(+), 27 deletions(-) diff --git a/lib/membrane/rtcp/feedback_packet/afb.ex b/lib/membrane/rtcp/feedback_packet/afb.ex index 68a60183..c28d5ed1 100644 --- a/lib/membrane/rtcp/feedback_packet/afb.ex +++ b/lib/membrane/rtcp/feedback_packet/afb.ex @@ -1,20 +1,23 @@ defmodule Membrane.RTCP.FeedbackPacket.AFB do @moduledoc """ - TODO: Mock module, for now ignores PSFB=206 & PT=15 packets. - Should encode and decode [Application Layer Feedback](https://datatracker.ietf.org/doc/html/rfc4585#section-6.4) packets. + [Application Layer Feedback](https://datatracker.ietf.org/doc/html/rfc4585#section-6.4) packets. + + They use PT=PSFB (206) & FMT=15. + Since the message must be handled at the application layer, the struct simply wraps a binary content of message """ @behaviour Membrane.RTCP.FeedbackPacket - defstruct [] + @enforce_keys [:message] + defstruct @enforce_keys @impl true - def decode(_binary) do - {:ok, %__MODULE__{}} + def decode(binary) do + {:ok, %__MODULE__{message: binary}} end @impl true - def encode(_packet) do - <<>> + def encode(%__MODULE__{message: message}) when is_binary(message) do + message end end diff --git a/lib/membrane/rtcp/parser.ex b/lib/membrane/rtcp/parser.ex index 74389664..d68a4eb1 100644 --- a/lib/membrane/rtcp/parser.ex +++ b/lib/membrane/rtcp/parser.ex @@ -47,7 +47,7 @@ defmodule Membrane.RTCP.Parser do {{:ok, actions}, state} {:error, reason} -> - Membrane.Logger.debug(""" + Membrane.Logger.warn(""" Couldn't parse rtcp packet: #{inspect(payload, limit: :infinity)} Reason: #{inspect(reason)}. Ignoring packet. @@ -63,16 +63,14 @@ defmodule Membrane.RTCP.Parser do {{:ok, buffer: {:receiver_report_output, buffer}}, state} end - @impl true - def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state) - defp process_packets(rtcp, metadata) do Enum.flat_map(rtcp, &process_rtcp(&1, metadata)) end - defp process_rtcp(%RTCP.FeedbackPacket{payload: %RTCP.FeedbackPacket.PLI{}}, _metadata) do - Membrane.Logger.debug("Received packet loss indicator RTCP packet") - [] + defp process_rtcp(%RTCP.FeedbackPacket{payload: %keyframe_request{}} = packet, metadata) + when keyframe_request in [RTCP.FeedbackPacket.FIR, RTCP.FeedbackPacket.PLI] do + event = wrap_with_rtcp_event(packet, packet.target_ssrc, metadata) + [event: {:output, event}] end defp process_rtcp( @@ -83,27 +81,55 @@ defmodule Membrane.RTCP.Parser do end defp process_rtcp(%RTCP.SenderReportPacket{ssrc: ssrc} = packet, metadata) do - event = %RTCPEvent{ - rtcp: packet, - ssrcs: [ssrc], - arrival_timestamp: Map.get(metadata, :arrival_ts, Membrane.Time.vm_time()) - } - + event = wrap_with_rtcp_event(packet, ssrc, metadata) [event: {:output, event}] end defp process_rtcp(%RTCP.ReceiverReportPacket{reports: reports}, metadata) do reports |> Enum.map(fn report -> - event = %RTCPEvent{ - rtcp: report, - ssrcs: [report.ssrc], - arrival_timestamp: Map.get(metadata, :arrival_ts, Membrane.Time.vm_time()) - } - + event = wrap_with_rtcp_event(report, report.ssrc, metadata) {:event, {:output, event}} end) end - defp process_rtcp(_unknown_packet, _metadata), do: [] + defp process_rtcp( + %RTCP.FeedbackPacket{ + payload: %RTCP.FeedbackPacket.AFB{message: "REMB" <> _remb_data} + }, + _metadata + ) do + # maybe TODO: handle REMB extension + # Even though we do not support REMB and do not advertise such support in SDP, + # browsers ignore that and send REMP packets for video ¯\_(ツ)_/¯ + [] + end + + defp process_rtcp(%RTCP.ByePacket{ssrcs: ssrcs}, _metadata) do + Membrane.Logger.debug("SSRCs #{inspect(ssrcs)} are leaving (received RTCP Bye)") + [] + end + + defp process_rtcp(%RTCP.SdesPacket{}, _metadata) do + # We don't care about SdesPacket, usually included in compound packet with SenderReportPacket or ReceiverReportPacket + [] + end + + defp process_rtcp(unknown_packet, metadata) do + Membrane.Logger.warn(""" + Unhandled RTCP packet + #{inspect(unknown_packet, pretty: true, limit: :infinity)} + #{inspect(metadata, pretty: true)} + """) + + [] + end + + defp wrap_with_rtcp_event(rtcp_packet, ssrc, metadata) do + %RTCPEvent{ + rtcp: rtcp_packet, + ssrcs: [ssrc], + arrival_timestamp: Map.get(metadata, :arrival_ts, Membrane.Time.vm_time()) + } + end end From 6be1b0782b800d1124dc791a02cd169aaa2ddbd1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20B=C5=82aszk=C3=B3w?= Date: Fri, 1 Jul 2022 16:28:29 +0200 Subject: [PATCH 02/13] SessionBin docs fix --- lib/membrane/rtp/session_bin.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/membrane/rtp/session_bin.ex b/lib/membrane/rtp/session_bin.ex index c17f3228..2750c6c1 100644 --- a/lib/membrane/rtp/session_bin.ex +++ b/lib/membrane/rtp/session_bin.ex @@ -38,7 +38,7 @@ defmodule Membrane.RTP.SessionBin do ## RTCP RTCP packets for inbound stream can be provided either in-band or via a separate `rtp_input` pad instance. Corresponding - receiver report packets will be sent back through `rtcp_output` with the same id as `rtp_input` for the RTP stream. + receiver report packets will be sent back through `rtcp_receiver_output` with the same id as `rtp_input` for the RTP stream. RTCP for outbound stream is not yet supported. # But will be :) """ From 20d32971e2c2064bd1fd736adbad73df2ac36019 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20B=C5=82aszk=C3=B3w?= Date: Fri, 1 Jul 2022 16:29:08 +0200 Subject: [PATCH 03/13] Small TWCC.encode optimization --- lib/membrane/rtcp/transport_feedback_packet/twcc.ex | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/membrane/rtcp/transport_feedback_packet/twcc.ex b/lib/membrane/rtcp/transport_feedback_packet/twcc.ex index 8124a028..031deac4 100644 --- a/lib/membrane/rtcp/transport_feedback_packet/twcc.ex +++ b/lib/membrane/rtcp/transport_feedback_packet/twcc.ex @@ -106,13 +106,12 @@ defmodule Membrane.RTCP.TransportFeedbackPacket.TWCC do <> - encoded_packet_status_chunks = - Enum.map_join(packet_status_chunks, &encode_packet_status_chunk/1) + encoded_packet_status_chunks = Enum.map(packet_status_chunks, &encode_packet_status_chunk/1) - encoded_receive_deltas = Enum.map_join(scaled_receive_deltas, &encode_receive_delta/1) + encoded_receive_deltas = Enum.map(scaled_receive_deltas, &encode_receive_delta/1) [encoded_header, encoded_packet_status_chunks, encoded_receive_deltas] - |> Enum.join() + |> IO.iodata_to_binary() |> maybe_add_padding() end From e18328b87e75e0017cee04d1c2800403e10a36c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20B=C5=82aszk=C3=B3w?= Date: Fri, 1 Jul 2022 17:03:39 +0200 Subject: [PATCH 04/13] Send KeyframeRequestEvent on PLI & FIR --- lib/membrane/rtp/outbound_packet_tracker.ex | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/lib/membrane/rtp/outbound_packet_tracker.ex b/lib/membrane/rtp/outbound_packet_tracker.ex index 23c65885..09475a41 100644 --- a/lib/membrane/rtp/outbound_packet_tracker.ex +++ b/lib/membrane/rtp/outbound_packet_tracker.ex @@ -8,9 +8,12 @@ defmodule Membrane.RTP.OutboundPacketTracker do """ use Membrane.Filter - alias Membrane.{Buffer, RTP, Payload, Time} + alias Membrane.{Buffer, RTCPEvent, RTP, Payload, Time} + alias Membrane.RTCP.FeedbackPacket.{PLI, FIR} alias Membrane.RTP.Session.SenderReport + require Membrane.Logger + def_input_pad :input, caps: :any, demand_mode: :auto def_output_pad :output, caps: :any, demand_mode: :auto @@ -91,6 +94,21 @@ defmodule Membrane.RTP.OutboundPacketTracker do raise "rtcp_output pad can get linked just once" end + @impl true + def handle_event( + Pad.ref(:rtcp_input, _id), + %RTCPEvent{rtcp: %{payload: %keyframe_request{}}}, + _ctx, + state + ) + when keyframe_request in [PLI, FIR] do + {{:ok, event: {:input, %Membrane.KeyframeRequestEvent{}}}, state} + end + + def handle_event(pad, event, ctx, state) do + super(pad, event, ctx, state) + end + @impl true def handle_process(:input, %Buffer{} = buffer, _ctx, state) do state = update_stats(buffer, state) From fc6cd187420eb5f9d1f09f35d1e73be3f1b1de0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20B=C5=82aszk=C3=B3w?= Date: Tue, 5 Jul 2022 12:57:58 +0200 Subject: [PATCH 05/13] Log RTCP events for unknown SSRC --- lib/membrane/rtp/ssrc_router.ex | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/lib/membrane/rtp/ssrc_router.ex b/lib/membrane/rtp/ssrc_router.ex index b345ae4c..da383314 100644 --- a/lib/membrane/rtp/ssrc_router.ex +++ b/lib/membrane/rtp/ssrc_router.ex @@ -15,6 +15,7 @@ defmodule Membrane.RTP.SSRCRouter do alias Membrane.{RTCP, RTP, RTCPEvent, SRTP} + require Membrane.Logger require Membrane.TelemetryMetrics @packet_arrival_event [Membrane.RTP, :packet, :arrival] @@ -138,8 +139,18 @@ defmodule Membrane.RTP.SSRCRouter do def handle_event(Pad.ref(:input, _id), %RTCPEvent{} = event, ctx, state) do actions = event.ssrcs - |> Enum.map(&{:event, {Pad.ref(:output, &1), event}}) - |> Enum.filter(fn {:event, {pad, _event}} -> Map.has_key?(ctx.pads, pad) end) + |> Enum.flat_map(fn ssrc -> + target_pad = Pad.ref(:output, ssrc) + + if Map.has_key?(ctx.pads, target_pad) do + [event: {target_pad, event}] + else + # TODO: This should most likely be a warning, however it appears on every join and leave, + # So until it's fixed, it is reported with debug log level + Membrane.Logger.debug("Received event (#{inspect(event)}) for unknown SSRC: #{ssrc}") + [] + end + end) {{:ok, actions}, state} end From ca8a29b94e38204cd60043364d995502be5aa22a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20B=C5=82aszk=C3=B3w?= Date: Tue, 5 Jul 2022 15:11:04 +0200 Subject: [PATCH 06/13] Add FIR throttling --- lib/membrane/rtcp/receiver.ex | 49 +++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/lib/membrane/rtcp/receiver.ex b/lib/membrane/rtcp/receiver.ex index 039a3f56..5c32548d 100644 --- a/lib/membrane/rtcp/receiver.ex +++ b/lib/membrane/rtcp/receiver.ex @@ -22,12 +22,24 @@ defmodule Membrane.RTCP.Receiver do fir_interval: [spec: Membrane.Time.t() | nil, default: nil], telemetry_label: [spec: Membrane.TelemetryMetrics.label(), default: []] - @event_name [Membrane.RTP, :rtcp, :fir, :sent] + @fir_trottle_duration Application.compile_env( + :membrane_videoroom, + :fir_throttle_duration, + 500 |> Time.milliseconds() + ) + + @fir_telemetry_event [Membrane.RTP, :rtcp, :fir, :sent] @impl true def handle_init(opts) do - Membrane.TelemetryMetrics.register(@event_name, opts.telemetry_label) - {:ok, Map.from_struct(opts) |> Map.merge(%{fir_seq_num: 0, sr_info: %{}})} + Membrane.TelemetryMetrics.register(@fir_telemetry_event, opts.telemetry_label) + + state = + opts + |> Map.from_struct() + |> Map.merge(%{fir_seq_num: 0, last_fir_timestamp: 0, sr_info: %{}}) + + {:ok, state} end @impl true @@ -75,7 +87,8 @@ defmodule Membrane.RTCP.Receiver do end @impl true - def handle_event(:input, %RTCPEvent{}, _ctx, state) do + def handle_event(:input, %RTCPEvent{} = event, _ctx, state) do + Membrane.Logger.error("Unexpected RTCPEvent: #{inspect(event)}") {:ok, state} end @@ -118,18 +131,26 @@ defmodule Membrane.RTCP.Receiver do end defp send_fir(state) do - rtcp = %FeedbackPacket{ - origin_ssrc: state.local_ssrc, - payload: %FeedbackPacket.FIR{ - target_ssrc: state.remote_ssrc, - seq_num: state.fir_seq_num + now = Time.vm_time() + + if now - state.last_fir_timestamp > @fir_trottle_duration do + rtcp = %FeedbackPacket{ + origin_ssrc: state.local_ssrc, + payload: %FeedbackPacket.FIR{ + target_ssrc: state.remote_ssrc, + seq_num: state.fir_seq_num + } } - } - Membrane.TelemetryMetrics.execute(@event_name, %{}, %{}, state.telemetry_label) + Membrane.TelemetryMetrics.execute(@fir_telemetry_event, %{}, %{}, state.telemetry_label) - event = %RTCPEvent{rtcp: rtcp} - state = Map.update!(state, :fir_seq_num, &(&1 + 1)) - {{:ok, event: {:input, event}}, state} + event = %RTCPEvent{rtcp: rtcp} + state = %{state | fir_seq_num: state.fir_seq_num + 1, last_fir_timestamp: now} + Membrane.Logger.info("Sending FIR to #{state.remote_ssrc}") + {{:ok, event: {:input, event}}, state} + else + Membrane.Logger.debug("Not sending FIR to #{state.remote_ssrc} due to throttling") + {:ok, state} + end end end From 40f7743088dc318fd417c77c5f8b38657c491fe3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20B=C5=82aszk=C3=B3w?= Date: Tue, 5 Jul 2022 15:15:38 +0200 Subject: [PATCH 07/13] Rename the linking function in session_bin --- lib/membrane/rtp/session_bin.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/membrane/rtp/session_bin.ex b/lib/membrane/rtp/session_bin.ex index 2750c6c1..69aee9e8 100644 --- a/lib/membrane/rtp/session_bin.ex +++ b/lib/membrane/rtp/session_bin.ex @@ -514,7 +514,7 @@ defmodule Membrane.RTP.SessionBin do # if RTCP is present create all set of input and output pads for RTCP flow rtcp_links = if rtcp? do - maybe_link_srtcp_encryptor = + link_srtcp_encryptor = &to( &1, {:srtcp_sender_encryptor, ssrc}, @@ -526,7 +526,7 @@ defmodule Membrane.RTP.SessionBin do [ link({:stream_send_bin, ssrc}) |> via_out(:rtcp_output) - |> then(if state.secure?, do: maybe_link_srtcp_encryptor, else: & &1) + |> then(if state.secure?, do: link_srtcp_encryptor, else: & &1) |> to_bin_output(rtcp_sender_output), link(:ssrc_router) |> via_out(Pad.ref(:output, ssrc)) From 20f3777049a6238653ecb0e21aeccbc7784d8c3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20B=C5=82aszk=C3=B3w?= Date: Tue, 5 Jul 2022 15:50:39 +0200 Subject: [PATCH 08/13] Get rid of FIR interval --- lib/membrane/rtcp/receiver.ex | 14 ++------------ lib/membrane/rtp/session_bin.ex | 7 ------- lib/membrane/rtp/stream_receive_bin.ex | 2 -- test/membrane/rtp/session_bin_test.exs | 2 +- test/membrane/rtp/stream_receive_bin_test.exs | 6 ++---- 5 files changed, 5 insertions(+), 26 deletions(-) diff --git a/lib/membrane/rtcp/receiver.ex b/lib/membrane/rtcp/receiver.ex index 5c32548d..c7bc24ee 100644 --- a/lib/membrane/rtcp/receiver.ex +++ b/lib/membrane/rtcp/receiver.ex @@ -19,7 +19,6 @@ defmodule Membrane.RTCP.Receiver do def_options local_ssrc: [spec: RTP.ssrc_t()], remote_ssrc: [spec: RTP.ssrc_t()], report_interval: [spec: Membrane.Time.t() | nil, default: nil], - fir_interval: [spec: Membrane.Time.t() | nil, default: nil], telemetry_label: [spec: Membrane.TelemetryMetrics.label(), default: []] @fir_trottle_duration Application.compile_env( @@ -44,22 +43,18 @@ defmodule Membrane.RTCP.Receiver do @impl true def handle_prepared_to_playing(_ctx, state) do - fir_timer = - if state.fir_interval, do: [start_timer: {:fir_timer, state.fir_interval}], else: [] - report_timer = if state.report_interval, do: [start_timer: {:report_timer, state.report_interval}], else: [] - {{:ok, fir_timer ++ report_timer}, state} + {{:ok, report_timer}, state} end @impl true def handle_playing_to_prepared(_ctx, state) do - fir_timer = if state.fir_interval, do: [stop_timer: :fir_timer], else: [] report_timer = if state.report_interval, do: [stop_timer: :report_timer], else: [] - {{:ok, fir_timer ++ report_timer}, state} + {{:ok, report_timer}, state} end @impl true @@ -67,11 +62,6 @@ defmodule Membrane.RTCP.Receiver do {{:ok, event: {:output, %ReceiverReport.StatsRequestEvent{}}}, state} end - @impl true - def handle_tick(:fir_timer, _ctx, state) do - send_fir(state) - end - @impl true def handle_event(:input, %RTCPEvent{rtcp: %SenderReportPacket{} = rtcp} = event, _ctx, state) do <<_wallclock_ts_upper_16_bits::16, wallclock_ts_middle_32_bits::32, diff --git a/lib/membrane/rtp/session_bin.ex b/lib/membrane/rtp/session_bin.ex index 69aee9e8..fb4474bf 100644 --- a/lib/membrane/rtp/session_bin.ex +++ b/lib/membrane/rtp/session_bin.ex @@ -239,11 +239,6 @@ defmodule Membrane.RTP.SessionBin do An extension can be responsible e.g. for dropping silent audio packets when encountered VAD extension data in the packet header. """ - ], - rtcp_fir_interval: [ - spec: Membrane.Time.t() | nil, - default: Membrane.Time.second(), - description: "Interval between sending subseqent RTCP Full Intra Request packets." ] ] @@ -391,7 +386,6 @@ defmodule Membrane.RTP.SessionBin do depayloader: depayloader, clock_rate: clock_rate, rtp_extensions: rtp_extensions, - rtcp_fir_interval: fir_interval, encoding: encoding, telemetry_label: telemetry_label, extensions: extensions @@ -412,7 +406,6 @@ defmodule Membrane.RTP.SessionBin do local_ssrc: local_ssrc, remote_ssrc: ssrc, rtcp_report_interval: state.rtcp_receiver_report_interval, - rtcp_fir_interval: fir_interval, telemetry_label: telemetry_label, secure?: state.secure?, srtp_policies: state.srtp_policies diff --git a/lib/membrane/rtp/stream_receive_bin.ex b/lib/membrane/rtp/stream_receive_bin.ex index 00d6fbbf..6983d17d 100644 --- a/lib/membrane/rtp/stream_receive_bin.ex +++ b/lib/membrane/rtp/stream_receive_bin.ex @@ -30,7 +30,6 @@ defmodule Membrane.RTP.StreamReceiveBin do local_ssrc: [spec: Membrane.RTP.ssrc_t()], remote_ssrc: [spec: Membrane.RTP.ssrc_t()], rtcp_report_interval: [spec: Membrane.Time.t() | nil], - rtcp_fir_interval: [spec: Membrane.Time.t() | nil], telemetry_label: [ spec: [{atom(), any()}], default: [] @@ -60,7 +59,6 @@ defmodule Membrane.RTP.StreamReceiveBin do local_ssrc: opts.local_ssrc, remote_ssrc: opts.remote_ssrc, report_interval: opts.rtcp_report_interval, - fir_interval: opts.rtcp_fir_interval, telemetry_label: opts.telemetry_label }) |> to(:packet_tracker, %Membrane.RTP.InboundPacketTracker{ diff --git a/test/membrane/rtp/session_bin_test.exs b/test/membrane/rtp/session_bin_test.exs index 5b080436..0721b5d9 100644 --- a/test/membrane/rtp/session_bin_test.exs +++ b/test/membrane/rtp/session_bin_test.exs @@ -195,7 +195,7 @@ defmodule Membrane.RTP.Session.BinTest do links: [ link(:rtp) |> via_out(Pad.ref(:output, ssrc), - options: [depayloader: depayloader, rtcp_fir_interval: nil] + options: [depayloader: depayloader] ) |> to({:sink, ssrc}) ] diff --git a/test/membrane/rtp/stream_receive_bin_test.exs b/test/membrane/rtp/stream_receive_bin_test.exs index 659c48c4..bf64aaeb 100644 --- a/test/membrane/rtp/stream_receive_bin_test.exs +++ b/test/membrane/rtp/stream_receive_bin_test.exs @@ -44,8 +44,7 @@ defmodule Membrane.RTP.StreamReceiveBinTest do depayloader: H264.Depayloader, remote_ssrc: @ssrc, local_ssrc: 0, - rtcp_report_interval: Membrane.Time.seconds(5), - rtcp_fir_interval: nil + rtcp_report_interval: Membrane.Time.seconds(5) }, video_parser: %Membrane.H264.FFmpeg.Parser{framerate: {30, 1}}, frame_counter: FrameCounter @@ -86,8 +85,7 @@ defmodule Membrane.RTP.StreamReceiveBinTest do depayloader: H264.Depayloader, local_ssrc: 0, remote_ssrc: 4_194_443_425, - rtcp_report_interval: Membrane.Time.seconds(5), - rtcp_fir_interval: nil + rtcp_report_interval: Membrane.Time.seconds(5) }, sink: Testing.Sink ] From fc412443572c2689f9e386c9ec085054b670a98d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20B=C5=82aszk=C3=B3w?= Date: Wed, 6 Jul 2022 11:47:10 +0200 Subject: [PATCH 09/13] Add AFB & FIR sending tests --- config/config.exs | 3 + lib/membrane/rtcp/receiver.ex | 13 +-- test/membrane/rtcp/afb_packet_test.exs | 20 +++++ test/membrane/rtp/stream_receive_bin_test.exs | 85 +++++++++++++++++++ 4 files changed, 115 insertions(+), 6 deletions(-) create mode 100644 config/config.exs create mode 100644 test/membrane/rtcp/afb_packet_test.exs diff --git a/config/config.exs b/config/config.exs new file mode 100644 index 00000000..f5ae078d --- /dev/null +++ b/config/config.exs @@ -0,0 +1,3 @@ +import Config + +config :membrane_rtp_plugin, :fir_throttle_duration_ms, 500 diff --git a/lib/membrane/rtcp/receiver.ex b/lib/membrane/rtcp/receiver.ex index c7bc24ee..792d09db 100644 --- a/lib/membrane/rtcp/receiver.ex +++ b/lib/membrane/rtcp/receiver.ex @@ -21,11 +21,12 @@ defmodule Membrane.RTCP.Receiver do report_interval: [spec: Membrane.Time.t() | nil, default: nil], telemetry_label: [spec: Membrane.TelemetryMetrics.label(), default: []] - @fir_trottle_duration Application.compile_env( - :membrane_videoroom, - :fir_throttle_duration, - 500 |> Time.milliseconds() - ) + @fir_throttle_duration Application.compile_env( + :membrane_rtp_plugin, + :fir_throttle_duration_ms, + 500 + ) + |> Membrane.Time.milliseconds() @fir_telemetry_event [Membrane.RTP, :rtcp, :fir, :sent] @@ -123,7 +124,7 @@ defmodule Membrane.RTCP.Receiver do defp send_fir(state) do now = Time.vm_time() - if now - state.last_fir_timestamp > @fir_trottle_duration do + if now - state.last_fir_timestamp > @fir_throttle_duration do rtcp = %FeedbackPacket{ origin_ssrc: state.local_ssrc, payload: %FeedbackPacket.FIR{ diff --git a/test/membrane/rtcp/afb_packet_test.exs b/test/membrane/rtcp/afb_packet_test.exs new file mode 100644 index 00000000..9c195280 --- /dev/null +++ b/test/membrane/rtcp/afb_packet_test.exs @@ -0,0 +1,20 @@ +defmodule Membrane.RTCP.FeedbackPacket.AFBTest do + use ExUnit.Case, async: true + + alias Membrane.RTCP.{Packet, FeedbackPacket, SdesPacket, SenderReportPacket} + + # Real, compound, RTCP packet from browser containing SR, Sdes & REMB + @remb_rtcp_packet <<128, 200, 0, 6, 120, 61, 239, 185, 230, 110, 197, 157, 82, 42, 144, 205, + 172, 85, 146, 216, 0, 0, 3, 121, 0, 14, 109, 134, 129, 202, 0, 6, 120, 61, + 239, 185, 1, 16, 116, 79, 97, 68, 55, 57, 102, 72, 120, 105, 119, 102, 110, + 56, 120, 85, 0, 0, 143, 206, 0, 5, 120, 61, 239, 185, 0, 0, 0, 0, 82, 69, + 77, 66, 1, 15, 36, 147, 224, 97, 78, 29>> + + test "AFB Packet can be parsed" do + assert {:ok, [%SenderReportPacket{}, %SdesPacket{}, %FeedbackPacket{payload: fb_payload}]} = + Packet.parse(@remb_rtcp_packet) + + assert %FeedbackPacket.AFB{message: binary} = fb_payload + assert "REMB" <> _rest = binary + end +end diff --git a/test/membrane/rtp/stream_receive_bin_test.exs b/test/membrane/rtp/stream_receive_bin_test.exs index bf64aaeb..ae9307ee 100644 --- a/test/membrane/rtp/stream_receive_bin_test.exs +++ b/test/membrane/rtp/stream_receive_bin_test.exs @@ -3,6 +3,7 @@ defmodule Membrane.RTP.StreamReceiveBinTest do import Membrane.Testing.Assertions + alias Membrane.RTCP.FeedbackPacket alias Membrane.RTP alias Membrane.RTP.StreamReceiveBin alias Membrane.RTP.H264 @@ -12,6 +13,10 @@ defmodule Membrane.RTP.StreamReceiveBinTest do @frames_count 1038 @ssrc 790_688_045 @h264_clock_rate 90_000 + @fir_throttle_duration_ms Application.compile_env!( + :membrane_rtp_plugin, + :fir_throttle_duration_ms + ) defmodule FrameCounter do use Membrane.Sink @@ -100,4 +105,84 @@ defmodule Membrane.RTP.StreamReceiveBinTest do assert_end_of_stream(pipeline, :sink) Testing.Pipeline.terminate(pipeline, blocking?: true) end + + defmodule NoopSource do + use Membrane.Source + + def_output_pad :output, mode: :push, caps: :any + + @impl true + def handle_event(:output, event, _ctx, state) do + {{:ok, notify: event}, state} + end + end + + defmodule KeyframeRequester do + use Membrane.Sink + + def_input_pad :input, demand_unit: :buffers, caps: :any + + def_options delay: [spec: integer()] + + @impl true + def handle_init(%{delay: delay}) do + {:ok, delay} + end + + @impl true + def handle_prepared_to_playing(_ctx, delay) do + keyframe_request = {:event, {:input, %Membrane.KeyframeRequestEvent{}}} + Process.send_after(self(), keyframe_request, delay) + {{:ok, [keyframe_request, keyframe_request]}, delay} + end + + @impl true + def handle_other(keyframe_request, _ctx, delay) do + {{:ok, [keyframe_request, keyframe_request]}, delay} + end + end + + test "FIR sending with throttle" do + remote_ssrc = 4_194_443_425 + half_throttle_duration = div(@fir_throttle_duration_ms, 2) + + opts = %Testing.Pipeline.Options{ + elements: [ + src: NoopSource, + rtp: %StreamReceiveBin{ + clock_rate: @h264_clock_rate, + depayloader: H264.Depayloader, + local_ssrc: 0, + remote_ssrc: remote_ssrc, + rtcp_report_interval: nil + }, + sink: %KeyframeRequester{delay: trunc(@fir_throttle_duration_ms * 1.1)} + ] + } + + {:ok, pipeline} = Testing.Pipeline.start_link(opts) + + assert_pipeline_playback_changed(pipeline, _, :playing) + assert_pipeline_notified(pipeline, :src, %Membrane.RTCPEvent{rtcp: rtcp}) + assert %FeedbackPacket{payload: fir} = rtcp + assert fir == %FeedbackPacket.FIR{target_ssrc: remote_ssrc, seq_num: 0} + + # Ensure we're not getting it twice before throttle duration passes + refute_pipeline_notified(pipeline, :src, %Membrane.RTCPEvent{}, half_throttle_duration) + + # Then ensure we get the next one + assert_pipeline_notified( + pipeline, + :src, + %Membrane.RTCPEvent{rtcp: rtcp}, + @fir_throttle_duration_ms + ) + + assert %FeedbackPacket{payload: fir} = rtcp + assert fir == %FeedbackPacket.FIR{target_ssrc: remote_ssrc, seq_num: 1} + + # ... and only one + refute_pipeline_notified(pipeline, :src, %Membrane.RTCPEvent{}, half_throttle_duration) + Testing.Pipeline.terminate(pipeline, blocking?: true) + end end From 6f2cf71275447b7deb24dd137aafe8b639a6b1cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20B=C5=82aszk=C3=B3w?= Date: Wed, 6 Jul 2022 15:50:15 +0200 Subject: [PATCH 10/13] Add RTCP.Parser test --- lib/membrane/rtcp/parser.ex | 2 +- test/membrane/rtcp/afb_packet_test.exs | 11 ++-------- test/membrane/rtcp/parser_test.exs | 29 ++++++++++++++++++++++++++ test/support/rtcp_fixtures.ex | 24 +++++++++++++++++++++ 4 files changed, 56 insertions(+), 10 deletions(-) create mode 100644 test/membrane/rtcp/parser_test.exs diff --git a/lib/membrane/rtcp/parser.ex b/lib/membrane/rtcp/parser.ex index d68a4eb1..cec12590 100644 --- a/lib/membrane/rtcp/parser.ex +++ b/lib/membrane/rtcp/parser.ex @@ -101,7 +101,7 @@ defmodule Membrane.RTCP.Parser do ) do # maybe TODO: handle REMB extension # Even though we do not support REMB and do not advertise such support in SDP, - # browsers ignore that and send REMP packets for video ¯\_(ツ)_/¯ + # browsers ignore that and send REMB packets for video as part of sender report ¯\_(ツ)_/¯ [] end diff --git a/test/membrane/rtcp/afb_packet_test.exs b/test/membrane/rtcp/afb_packet_test.exs index 9c195280..115421e5 100644 --- a/test/membrane/rtcp/afb_packet_test.exs +++ b/test/membrane/rtcp/afb_packet_test.exs @@ -1,18 +1,11 @@ defmodule Membrane.RTCP.FeedbackPacket.AFBTest do use ExUnit.Case, async: true - alias Membrane.RTCP.{Packet, FeedbackPacket, SdesPacket, SenderReportPacket} - - # Real, compound, RTCP packet from browser containing SR, Sdes & REMB - @remb_rtcp_packet <<128, 200, 0, 6, 120, 61, 239, 185, 230, 110, 197, 157, 82, 42, 144, 205, - 172, 85, 146, 216, 0, 0, 3, 121, 0, 14, 109, 134, 129, 202, 0, 6, 120, 61, - 239, 185, 1, 16, 116, 79, 97, 68, 55, 57, 102, 72, 120, 105, 119, 102, 110, - 56, 120, 85, 0, 0, 143, 206, 0, 5, 120, 61, 239, 185, 0, 0, 0, 0, 82, 69, - 77, 66, 1, 15, 36, 147, 224, 97, 78, 29>> + alias Membrane.RTCP.{FeedbackPacket, Fixtures, Packet, SdesPacket, SenderReportPacket} test "AFB Packet can be parsed" do assert {:ok, [%SenderReportPacket{}, %SdesPacket{}, %FeedbackPacket{payload: fb_payload}]} = - Packet.parse(@remb_rtcp_packet) + Packet.parse(Fixtures.compound_sr_sdes_remb()) assert %FeedbackPacket.AFB{message: binary} = fb_payload assert "REMB" <> _rest = binary diff --git a/test/membrane/rtcp/parser_test.exs b/test/membrane/rtcp/parser_test.exs new file mode 100644 index 00000000..b327347e --- /dev/null +++ b/test/membrane/rtcp/parser_test.exs @@ -0,0 +1,29 @@ +defmodule Membrane.RTCP.ParserTest do + use ExUnit.Case, async: true + + alias Membrane.{Buffer, RTCPEvent} + alias Membrane.RTCP.{FeedbackPacket, Fixtures, Parser, SenderReportPacket} + + test "Handles SR with REMB" do + buffer = %Buffer{payload: Fixtures.compound_sr_sdes_remb(), metadata: %{arrival_ts: 2137}} + state = %{} + assert {{:ok, events}, ^state} = Parser.handle_process(:input, buffer, %{}, state) + assert [event: {:output, %RTCPEvent{} = rtcp_event}] = events + assert rtcp_event.arrival_timestamp == 2137 + assert %SenderReportPacket{} = rtcp_event.rtcp + end + + test "Handles PLI" do + buffer = %Buffer{payload: Fixtures.pli_packet(), metadata: %{arrival_ts: 2137}} + state = %{} + assert {{:ok, events}, ^state} = Parser.handle_process(:input, buffer, %{}, state) + assert [event: {:output, %RTCPEvent{} = rtcp_event}] = events + assert rtcp_event.arrival_timestamp == 2137 + assert %{rtcp: %FeedbackPacket{} = feedback} = rtcp_event + + fixture_contents = Fixtures.pli_contents() + assert feedback.target_ssrc == fixture_contents.target_ssrc + assert feedback.origin_ssrc == fixture_contents.origin_ssrc + assert feedback.payload == %FeedbackPacket.PLI{} + end +end diff --git a/test/support/rtcp_fixtures.ex b/test/support/rtcp_fixtures.ex index fc3e5e06..8ca79b8f 100644 --- a/test/support/rtcp_fixtures.ex +++ b/test/support/rtcp_fixtures.ex @@ -35,6 +35,30 @@ defmodule Membrane.RTCP.Fixtures do |> Enum.map(&hex_to_bin/1) end + @doc """ + Returns a real, compound RTCP packet from browser containing SenderReport, Sdes & AFB with REMB + """ + @spec compound_sr_sdes_remb() :: binary() + def compound_sr_sdes_remb() do + <<128, 200, 0, 6, 120, 61, 239, 185, 230, 110, 197, 157, 82, 42, 144, 205, 172, 85, 146, 216, + 0, 0, 3, 121, 0, 14, 109, 134, 129, 202, 0, 6, 120, 61, 239, 185, 1, 16, 116, 79, 97, 68, + 55, 57, 102, 72, 120, 105, 119, 102, 110, 56, 120, 85, 0, 0, 143, 206, 0, 5, 120, 61, 239, + 185, 0, 0, 0, 0, 82, 69, 77, 66, 1, 15, 36, 147, 224, 97, 78, 29>> + end + + @spec pli_packet() :: binary() + def pli_packet() do + <<0x81, 0xCE, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x41, 0x6F, 0xB1, 0x0D>> + end + + @spec pli_contents() :: binary() + def pli_contents() do + %{ + origin_ssrc: 1, + target_ssrc: 0x416FB10D + } + end + @spec twcc_feedbacks() :: [binary()] def twcc_feedbacks() do @twcc_feedbacks From 98cab710be204070805d57782086080f8bbf5183 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20B=C5=82aszk=C3=B3w?= Date: Fri, 8 Jul 2022 12:30:04 +0200 Subject: [PATCH 11/13] RTCP.Parser: bring back event forward, refactor --- lib/membrane/rtcp/parser.ex | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/lib/membrane/rtcp/parser.ex b/lib/membrane/rtcp/parser.ex index cec12590..51a04a12 100644 --- a/lib/membrane/rtcp/parser.ex +++ b/lib/membrane/rtcp/parser.ex @@ -27,9 +27,8 @@ defmodule Membrane.RTCP.Parser do @impl true def handle_prepared_to_playing(_ctx, state) do - {{:ok, - caps: {:receiver_report_output, %RemoteStream{type: :packetized, content_format: RTCP}}}, - state} + caps = %RemoteStream{type: :packetized, content_format: RTCP} + {{:ok, caps: {:receiver_report_output, caps}}, state} end @impl true @@ -63,13 +62,16 @@ defmodule Membrane.RTCP.Parser do {{:ok, buffer: {:receiver_report_output, buffer}}, state} end + @impl true + def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state) + defp process_packets(rtcp, metadata) do Enum.flat_map(rtcp, &process_rtcp(&1, metadata)) end defp process_rtcp(%RTCP.FeedbackPacket{payload: %keyframe_request{}} = packet, metadata) when keyframe_request in [RTCP.FeedbackPacket.FIR, RTCP.FeedbackPacket.PLI] do - event = wrap_with_rtcp_event(packet, packet.target_ssrc, metadata) + event = to_rtcp_event(packet, packet.target_ssrc, metadata) [event: {:output, event}] end @@ -81,14 +83,14 @@ defmodule Membrane.RTCP.Parser do end defp process_rtcp(%RTCP.SenderReportPacket{ssrc: ssrc} = packet, metadata) do - event = wrap_with_rtcp_event(packet, ssrc, metadata) + event = to_rtcp_event(packet, ssrc, metadata) [event: {:output, event}] end defp process_rtcp(%RTCP.ReceiverReportPacket{reports: reports}, metadata) do reports |> Enum.map(fn report -> - event = wrap_with_rtcp_event(report, report.ssrc, metadata) + event = to_rtcp_event(report, report.ssrc, metadata) {:event, {:output, event}} end) end @@ -125,7 +127,7 @@ defmodule Membrane.RTCP.Parser do [] end - defp wrap_with_rtcp_event(rtcp_packet, ssrc, metadata) do + defp to_rtcp_event(rtcp_packet, ssrc, metadata) do %RTCPEvent{ rtcp: rtcp_packet, ssrcs: [ssrc], From 0bdf9377adec1e6cc85bf7a43f5e3709f83d132a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20B=C5=82aszk=C3=B3w?= Date: Fri, 8 Jul 2022 12:31:21 +0200 Subject: [PATCH 12/13] Add comment on KeyframeRequestEvent --- lib/membrane/rtp/outbound_packet_tracker.ex | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/membrane/rtp/outbound_packet_tracker.ex b/lib/membrane/rtp/outbound_packet_tracker.ex index 09475a41..f7572368 100644 --- a/lib/membrane/rtp/outbound_packet_tracker.ex +++ b/lib/membrane/rtp/outbound_packet_tracker.ex @@ -102,9 +102,13 @@ defmodule Membrane.RTP.OutboundPacketTracker do state ) when keyframe_request in [PLI, FIR] do + # PLI or FIR reaching OutboundPacketTracker means the receiving peer sent it + # We need to pass it to the sending peer's RTCP.Receiver (in StreamReceiveBin) to get translated again into FIR/PLI with proper SSRCs + # and then sent to the sender. So the KeyframeRequestEvent, like salmon, starts an upstream journey here trying to reach that peer. {{:ok, event: {:input, %Membrane.KeyframeRequestEvent{}}}, state} end + @impl true def handle_event(pad, event, ctx, state) do super(pad, event, ctx, state) end From 88d517f7a851ca8c98c2647c50cad0fb45181f2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20B=C5=82aszk=C3=B3w?= Date: Fri, 8 Jul 2022 12:32:50 +0200 Subject: [PATCH 13/13] Tiny refactor of StreamReceiveBin test --- test/membrane/rtp/stream_receive_bin_test.exs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/membrane/rtp/stream_receive_bin_test.exs b/test/membrane/rtp/stream_receive_bin_test.exs index ae9307ee..b8a277d7 100644 --- a/test/membrane/rtp/stream_receive_bin_test.exs +++ b/test/membrane/rtp/stream_receive_bin_test.exs @@ -142,9 +142,10 @@ defmodule Membrane.RTP.StreamReceiveBinTest do end end - test "FIR sending with throttle" do + test "FIRs are sent and throttled" do remote_ssrc = 4_194_443_425 half_throttle_duration = div(@fir_throttle_duration_ms, 2) + delta = div(@fir_throttle_duration_ms, 10) opts = %Testing.Pipeline.Options{ elements: [ @@ -156,7 +157,7 @@ defmodule Membrane.RTP.StreamReceiveBinTest do remote_ssrc: remote_ssrc, rtcp_report_interval: nil }, - sink: %KeyframeRequester{delay: trunc(@fir_throttle_duration_ms * 1.1)} + sink: %KeyframeRequester{delay: @fir_throttle_duration_ms + delta} ] } @@ -175,7 +176,7 @@ defmodule Membrane.RTP.StreamReceiveBinTest do pipeline, :src, %Membrane.RTCPEvent{rtcp: rtcp}, - @fir_throttle_duration_ms + half_throttle_duration + delta ) assert %FeedbackPacket{payload: fir} = rtcp