diff --git a/lib/membrane/rtcp/receiver.ex b/lib/membrane/rtcp/receiver.ex index 5fd636c5..039a3f56 100644 --- a/lib/membrane/rtcp/receiver.ex +++ b/lib/membrane/rtcp/receiver.ex @@ -11,6 +11,7 @@ defmodule Membrane.RTCP.Receiver do alias Membrane.Time require Membrane.Logger + require Membrane.TelemetryMetrics def_input_pad :input, caps: :any, demand_mode: :auto def_output_pad :output, caps: :any, demand_mode: :auto @@ -18,10 +19,14 @@ defmodule Membrane.RTCP.Receiver do def_options local_ssrc: [spec: RTP.ssrc_t()], remote_ssrc: [spec: RTP.ssrc_t()], report_interval: [spec: Membrane.Time.t() | nil, default: nil], - fir_interval: [spec: Membrane.Time.t() | nil, default: nil] + fir_interval: [spec: Membrane.Time.t() | nil, default: nil], + telemetry_label: [spec: Membrane.TelemetryMetrics.label(), default: []] + + @event_name [Membrane.RTP, :rtcp, :fir, :sent] @impl true def handle_init(opts) do + Membrane.TelemetryMetrics.register(@event_name, opts.telemetry_label) {:ok, Map.from_struct(opts) |> Map.merge(%{fir_seq_num: 0, sr_info: %{}})} end @@ -121,6 +126,8 @@ defmodule Membrane.RTCP.Receiver do } } + Membrane.TelemetryMetrics.execute(@event_name, %{}, %{}, state.telemetry_label) + event = %RTCPEvent{rtcp: rtcp} state = Map.update!(state, :fir_seq_num, &(&1 + 1)) {{:ok, event: {:input, event}}, state} diff --git a/lib/membrane/rtp/metrics.ex b/lib/membrane/rtp/metrics.ex new file mode 100644 index 00000000..540bd8be --- /dev/null +++ b/lib/membrane/rtp/metrics.ex @@ -0,0 +1,37 @@ +defmodule Membrane.RTP.Metrics do + @moduledoc """ + Defines list of metrics, that can be aggregated based on events from membrane_rtp_plugin. + """ + + @doc """ + Returns list of metrics, that can be aggregated based on events from membrane_rtp_plugin. + """ + @spec metrics() :: [Telemetry.Metrics.t()] + def metrics() do + [ + Telemetry.Metrics.counter( + "inbound-rtp.keyframe_request_sent", + event_name: [Membrane.RTP, :rtcp, :fir, :sent] + ), + Telemetry.Metrics.counter( + "inbound-rtp.packets", + event_name: [Membrane.RTP, :packet, :arrival] + ), + Telemetry.Metrics.sum( + "inbound-rtp.bytes_received", + event_name: [Membrane.RTP, :packet, :arrival], + measurement: :bytes + ), + Telemetry.Metrics.last_value( + "inbound-rtp.encoding", + event_name: [Membrane.RTP, :inbound_track, :new], + measurement: :encoding + ), + Telemetry.Metrics.last_value( + "inbound-rtp.ssrc", + event_name: [Membrane.RTP, :inbound_track, :new], + measurement: :ssrc + ) + ] + end +end diff --git a/lib/membrane/rtp/payload_type_resolver.ex b/lib/membrane/rtp/payload_type_resolver.ex index f136af64..8b7aeac8 100644 --- a/lib/membrane/rtp/payload_type_resolver.ex +++ b/lib/membrane/rtp/payload_type_resolver.ex @@ -30,4 +30,20 @@ defmodule Membrane.RTP.PayloadFormatResolver do payloader -> {:ok, payloader} end end + + @spec keyframe_detector(atom()) :: {:ok, (binary() -> boolean())} | :error + def keyframe_detector(encoding) do + case PayloadFormat.get(encoding).keyframe_detector do + nil -> :error + keyframe_detector -> {:ok, keyframe_detector} + end + end + + @spec frame_detector(atom()) :: {:ok, (binary() -> boolean())} | :error + def frame_detector(encoding) do + case PayloadFormat.get(encoding).frame_detector do + nil -> :error + frame_detector -> {:ok, frame_detector} + end + end end diff --git a/lib/membrane/rtp/session_bin.ex b/lib/membrane/rtp/session_bin.ex index 4ad3da11..c17f3228 100644 --- a/lib/membrane/rtp/session_bin.ex +++ b/lib/membrane/rtp/session_bin.ex @@ -198,6 +198,15 @@ defmodule Membrane.RTP.SessionBin do If set to nil then the depayloading process gets skipped. """ ], + telemetry_label: [ + spec: Membrane.TelemetryMetrics.label(), + default: [], + description: "Label passed to Membrane.TelemetryMetrics functions" + ], + encoding: [ + spec: RTP.encoding_name_t() | nil, + default: nil + ], clock_rate: [ spec: integer() | nil, default: nil, @@ -383,6 +392,8 @@ defmodule Membrane.RTP.SessionBin do clock_rate: clock_rate, rtp_extensions: rtp_extensions, rtcp_fir_interval: fir_interval, + encoding: encoding, + telemetry_label: telemetry_label, extensions: extensions } = ctx.pads[pad].options @@ -402,6 +413,7 @@ defmodule Membrane.RTP.SessionBin do remote_ssrc: ssrc, rtcp_report_interval: state.rtcp_receiver_report_interval, rtcp_fir_interval: fir_interval, + telemetry_label: telemetry_label, secure?: state.secure?, srtp_policies: state.srtp_policies } @@ -410,9 +422,14 @@ defmodule Membrane.RTP.SessionBin do {rtp_extensions, maybe_link_twcc_receiver, state} = maybe_handle_twcc_receiver(rtp_extensions, ssrc, ctx, state) + ssrc_router_pad_options = [ + encoding: encoding, + telemetry_label: telemetry_label + ] + router_link = link(:ssrc_router) - |> via_out(Pad.ref(:output, ssrc)) + |> via_out(Pad.ref(:output, ssrc), options: ssrc_router_pad_options) |> then(maybe_link_twcc_receiver) |> to(rtp_stream_name) diff --git a/lib/membrane/rtp/ssrc_router.ex b/lib/membrane/rtp/ssrc_router.ex index 9972f762..b345ae4c 100644 --- a/lib/membrane/rtp/ssrc_router.ex +++ b/lib/membrane/rtp/ssrc_router.ex @@ -15,9 +15,27 @@ defmodule Membrane.RTP.SSRCRouter do alias Membrane.{RTCP, RTP, RTCPEvent, SRTP} + require Membrane.TelemetryMetrics + + @packet_arrival_event [Membrane.RTP, :packet, :arrival] + @new_inbound_track_event [Membrane.RTP, :inbound_track, :new] + def_input_pad :input, caps: [RTCP, RTP], availability: :on_request, demand_mode: :auto - def_output_pad :output, caps: RTP, availability: :on_request, demand_mode: :auto + def_output_pad :output, + caps: RTP, + availability: :on_request, + demand_mode: :auto, + options: [ + telemetry_label: [ + spec: Membrane.TelemetryMetrics.label(), + default: [] + ], + encoding: [ + spec: atom() | nil, + default: nil + ] + ] defmodule State do @moduledoc false @@ -26,7 +44,7 @@ defmodule Membrane.RTP.SSRCRouter do alias Membrane.RTP @type t() :: %__MODULE__{ - input_pads: %{RTP.ssrc_t() => input_pad :: Pad.ref_t()}, + input_pads: %{RTP.ssrc_t() => [input_pad :: Pad.ref_t()]}, buffered_actions: %{RTP.ssrc_t() => [Membrane.Element.Action.t()]}, srtp_keying_material_event: struct() | nil } @@ -62,8 +80,15 @@ defmodule Membrane.RTP.SSRCRouter do end @impl true - def handle_pad_added(Pad.ref(:output, ssrc) = pad, _ctx, state) do + def handle_pad_added(Pad.ref(:output, ssrc) = pad, ctx, state) do {buffered_actions, state} = pop_in(state, [:buffered_actions, ssrc]) + buffered_actions = Enum.reverse(buffered_actions || []) + + register_packet_arrival_event(pad, ctx) + emit_packet_arrival_events(buffered_actions, ctx) + + register_new_inbound_track_event(pad, ctx) + emit_new_inbound_track_event(ssrc, pad, ctx) events = if state.srtp_keying_material_event do @@ -72,7 +97,7 @@ defmodule Membrane.RTP.SSRCRouter do [] end - {{:ok, [caps: {pad, %RTP{}}] ++ events ++ Enum.reverse(buffered_actions || [])}, state} + {{:ok, [caps: {pad, %RTP{}}] ++ events ++ buffered_actions}, state} end @impl true @@ -104,6 +129,8 @@ defmodule Membrane.RTP.SSRCRouter do action = {:buffer, {Pad.ref(:output, ssrc), buffer}} {actions, state} = maybe_buffer_action(action, ssrc, ctx, state) + emit_packet_arrival_events(actions, ctx) + {{:ok, new_stream_actions ++ actions}, state} end @@ -182,5 +209,52 @@ defmodule Membrane.RTP.SSRCRouter do end end + defp emit_packet_arrival_events(actions, ctx) do + for action <- actions do + with {:buffer, {pad, buffer}} <- action do + emit_packet_arrival_event(buffer.payload, pad, ctx) + end + end + end + + defp register_packet_arrival_event(pad, ctx) do + Membrane.TelemetryMetrics.register( + @packet_arrival_event, + ctx.pads[pad].options.telemetry_label + ) + end + + defp register_new_inbound_track_event(pad, ctx) do + Membrane.TelemetryMetrics.register( + @new_inbound_track_event, + ctx.pads[pad].options.telemetry_label + ) + end + + defp emit_packet_arrival_event(payload, pad, ctx) do + Membrane.TelemetryMetrics.execute( + @packet_arrival_event, + %{bytes: byte_size(payload)}, + %{}, + ctx.pads[pad].options.telemetry_label + ) + end + + defp emit_new_inbound_track_event(ssrc, pad, ctx) do + Membrane.TelemetryMetrics.execute( + @new_inbound_track_event, + %{ssrc: ssrc} |> maybe_add_encoding(pad, ctx), + %{}, + ctx.pads[pad].options.telemetry_label + ) + end + + defp maybe_add_encoding(measurements, pad, ctx) do + case ctx.pads[pad].options.encoding do + nil -> measurements + encoding -> Map.put(measurements, :encoding, encoding) + end + end + defp linked?(ssrc, ctx), do: Map.has_key?(ctx.pads, Pad.ref(:output, ssrc)) end diff --git a/lib/membrane/rtp/stream_receive_bin.ex b/lib/membrane/rtp/stream_receive_bin.ex index 2bb7e945..00d6fbbf 100644 --- a/lib/membrane/rtp/stream_receive_bin.ex +++ b/lib/membrane/rtp/stream_receive_bin.ex @@ -30,7 +30,11 @@ defmodule Membrane.RTP.StreamReceiveBin do local_ssrc: [spec: Membrane.RTP.ssrc_t()], remote_ssrc: [spec: Membrane.RTP.ssrc_t()], rtcp_report_interval: [spec: Membrane.Time.t() | nil], - rtcp_fir_interval: [spec: Membrane.Time.t() | nil] + rtcp_fir_interval: [spec: Membrane.Time.t() | nil], + telemetry_label: [ + spec: [{atom(), any()}], + default: [] + ] def_input_pad :input, demand_unit: :buffers, caps: :any def_output_pad :output, caps: :any, demand_unit: :buffers @@ -56,7 +60,8 @@ defmodule Membrane.RTP.StreamReceiveBin do local_ssrc: opts.local_ssrc, remote_ssrc: opts.remote_ssrc, report_interval: opts.rtcp_report_interval, - fir_interval: opts.rtcp_fir_interval + fir_interval: opts.rtcp_fir_interval, + telemetry_label: opts.telemetry_label }) |> to(:packet_tracker, %Membrane.RTP.InboundPacketTracker{ clock_rate: opts.clock_rate, diff --git a/mix.exs b/mix.exs index 67f57a17..97153a3f 100644 --- a/mix.exs +++ b/mix.exs @@ -67,8 +67,9 @@ defmodule Membrane.RTP.Plugin.MixProject do defp deps do [ + {:membrane_telemetry_metrics, github: "membraneframework/membrane_telemetry_metrics"}, {:membrane_core, "~> 0.10.0"}, - {:membrane_rtp_format, "~> 0.4.0"}, + {:membrane_rtp_format, github: "membraneframework/membrane_rtp_format", override: true}, {:ex_libsrtp, "~> 0.4.0", optional: true}, {:qex, "~> 0.5.1"}, {:bunch, "~> 1.0"}, @@ -76,7 +77,8 @@ defmodule Membrane.RTP.Plugin.MixProject do {:bimap, "~> 1.1.0"}, # Test - {:membrane_rtp_h264_plugin, "~> 0.11", only: :test}, + {:membrane_rtp_h264_plugin, + github: "membraneframework/membrane_rtp_h264_plugin", only: :test}, {:membrane_rtp_mpegaudio_plugin, "~> 0.9", only: :test}, {:membrane_h264_ffmpeg_plugin, "~> 0.19", only: :test}, {:membrane_pcap_plugin, diff --git a/mix.lock b/mix.lock index a0e30095..4aaade8c 100644 --- a/mix.lock +++ b/mix.lock @@ -24,14 +24,15 @@ "membrane_caps_audio_mpeg": {:hex, :membrane_caps_audio_mpeg, "0.2.0", "9cf9a63f03e25b31cf31445325aa68e60a07d36ee1e759caa1422fa45df49367", [:mix], [], "hexpm", "f7a80e4841d46164c148be880932ac7425329f4bcc32eb36ad2e47eafe5f23e4"}, "membrane_common_c": {:hex, :membrane_common_c, "0.13.0", "c314623f93209eb2fa092379954c686f6e50ac89baa48360f836d24f4d53f5ee", [:mix], [{:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "90181fbbe481ccd0a4a76daf0300f8ad1b5b0bf0ebd8b42c133904f8839663ca"}, "membrane_core": {:hex, :membrane_core, "0.10.1", "b4fb68d9e541888b60ebbf4e22c4913a84f35c955846b7df26154cb7c5ce0f78", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 2.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d039f67d00cb1c1608b1e4f03cea8a9b5a88ff0fef3f61f54dc65e515b9dc286"}, - "membrane_h264_ffmpeg_plugin": {:hex, :membrane_h264_ffmpeg_plugin, "0.21.0", "7c151e7bedcb7087e8441f7574c4277ba2833e115da91b58cfe848d2e0266df2", [:mix], [{:bunch, "~> 1.3.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.13.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.3.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.2.0", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}, {:ratio, "~> 2.4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "18d4da4e185b15e8cacb59ebcb468311faa0c4031f0b7530689b2d5cc42cb984"}, + "membrane_h264_ffmpeg_plugin": {:hex, :membrane_h264_ffmpeg_plugin, "0.21.1", "1e242c68f9c7f589b0288aa7dc39dacd01140df81a3add707ac8f1d3adbb7ced", [:mix], [{:bunch, "~> 1.3.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.13.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.3.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.2.0", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}, {:ratio, "~> 2.4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "76ba51c3f69c7ef2888d946ecd62026f03a1dc87b263e6887f8ba0079fcb5e1e"}, "membrane_h264_format": {:hex, :membrane_h264_format, "0.3.0", "84426aac86c3f4d3e8110438c3514ad94aa528e7002650d40e3b3862e2af5e3e", [:mix], [], "hexpm", "8254e52cea3c5d7c078c960a32f1ba338eeae9e301515302fd293f1683fa8dd9"}, "membrane_hackney_plugin": {:hex, :membrane_hackney_plugin, "0.8.2", "6b83628cc2019aa0b143c09e77f2dd9199a05528599d93c289dcab2e947369fa", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3", [hex: :mockery, repo: "hexpm", optional: false]}], "hexpm", "42906166b3692ba2270deb61721225ca7edadd1dbde6a44435664234a93597e2"}, "membrane_pcap_plugin": {:git, "https://github.com/membraneframework/membrane_pcap_plugin.git", "2bab35e62ac87e5a2c5d514dfb0b51de12a3ed96", [tag: "v0.6.1"]}, "membrane_raw_video_format": {:hex, :membrane_raw_video_format, "0.2.0", "cda8eb207cf65c93690a19001aba3edbb2ba5d22abc8068a1f6a785ba871e8cf", [:mix], [], "hexpm", "6b716fc24f60834323637c95aaaa0f99be23fcc6a84a21af70195ef50185b634"}, - "membrane_rtp_format": {:hex, :membrane_rtp_format, "0.4.0", "be84e88206c6a91363660eeb7cbf03330cd9a00486fb4bce4a7b86a4172d3a4b", [:mix], [{:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "44944956c1031f49269b7d447525b7c3d1026120b100b4f0dfaca5dc228c65e0"}, - "membrane_rtp_h264_plugin": {:hex, :membrane_rtp_h264_plugin, "0.12.0", "922955606a53424e85d5035204f3d33e4d84eee306a316642757d0d4c2a3b605", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.3.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.4.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}], "hexpm", "c886277bf228dfc9882ec0bef4237511b61fbb91aa8854304a67342a1392f5ef"}, + "membrane_rtp_format": {:git, "https://github.com/membraneframework/membrane_rtp_format.git", "7f135ad0168324970d214d04c6a46b90a59e1c3c", []}, + "membrane_rtp_h264_plugin": {:git, "https://github.com/membraneframework/membrane_rtp_h264_plugin.git", "807befb264775c9994108403214e039d039e18b1", []}, "membrane_rtp_mpegaudio_plugin": {:hex, :membrane_rtp_mpegaudio_plugin, "0.10.0", "cc97d183e246e86c75877a898db440df9ab917f92260643fcd071611fd0c420a", [:mix], [{:membrane_caps_audio_mpeg, "~> 0.2.0", [hex: :membrane_caps_audio_mpeg, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.4.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}], "hexpm", "3f832b5a514bbca149e88c0fa83d10bfd006d5f83af2a48336602288096c5bc3"}, + "membrane_telemetry_metrics": {:git, "https://github.com/membraneframework/membrane_telemetry_metrics.git", "1ed21f9b75fac9d5520a26976ee252cf8953e8fa", []}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, "mockery": {:hex, :mockery, "2.3.1", "a02fd60b10ac9ed37a7a2ecf6786c1f1dd5c75d2b079a60594b089fba32dc087", [:mix], [], "hexpm", "1d0971d88ebf084e962da3f2cfee16f0ea8e04ff73a7710428500d4500b947fa"}, @@ -45,6 +46,7 @@ "shmex": {:hex, :shmex, "0.5.0", "7dc4fb1a8bd851085a652605d690bdd070628717864b442f53d3447326bcd3e8", [:mix], [{:bunch_native, "~> 0.5.0", [hex: :bunch_native, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "b67bb1e22734758397c84458dbb746519e28eac210423c267c7248e59fc97bdc"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, "telemetry": {:hex, :telemetry, "1.1.0", "a589817034a27eab11144ad24d5c0f9fab1f58173274b1e9bae7074af9cbee51", [:rebar3], [], "hexpm", "b727b2a1f75614774cff2d7565b64d0dfa5bd52ba517f16543e6fc7efcc0df48"}, + "telemetry_metrics": {:hex, :telemetry_metrics, "0.6.1", "315d9163a1d4660aedc3fee73f33f1d355dcc76c5c3ab3d59e76e3edf80eef1f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7be9e0871c41732c233be71e4be11b96e56177bf15dde64a8ac9ce72ac9834c6"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, "unifex": {:hex, :unifex, "1.0.0", "a8a2ac6f6f437dd689db8c680df53e28e43c0548cd58ba2af36a1241f66fbc62", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}], "hexpm", "23b30d5d9d65bb77c25397e476d86818df7b6519bcb0ea816429a70d0729c14d"}, }