Skip to content

Commit

Permalink
Merge pull request #89 from membraneframework/auto-demand
Browse files Browse the repository at this point in the history
switch to auto demands
  • Loading branch information
mat-hek authored Feb 8, 2022
2 parents 5c1d7af + 145ee34 commit 47cdf49
Show file tree
Hide file tree
Showing 23 changed files with 125 additions and 228 deletions.
11 changes: 3 additions & 8 deletions lib/membrane/rtcp/parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ defmodule Membrane.RTCP.Parser do

def_input_pad :input,
caps: {RemoteStream, type: :packetized, content_format: one_of([nil, RTP])},
demand_unit: :buffers
demand_mode: :auto

def_output_pad :output, caps: RTCP
def_output_pad :output, caps: RTCP, demand_mode: :auto
def_output_pad :rtcp_output, mode: :push, caps: :any

@impl true
Expand Down Expand Up @@ -47,11 +47,6 @@ defmodule Membrane.RTCP.Parser do
end
end

@impl true
def handle_demand(:output, size, :buffers, _ctx, state) do
{{:ok, demand: {:input, size}}, state}
end

@impl true
def handle_event(:output, %RTCPEvent{} = event, _ctx, state) do
buffer = %Buffer{payload: RTCP.Packet.serialize(event.rtcp)}
Expand All @@ -62,7 +57,7 @@ defmodule Membrane.RTCP.Parser do
def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state)

defp process_packets(rtcp, metadata) do
Enum.flat_map(rtcp, &process_rtcp(&1, metadata)) ++ [redemand: :output]
Enum.flat_map(rtcp, &process_rtcp(&1, metadata))
end

defp process_rtcp(%RTCP.FeedbackPacket{payload: %RTCP.FeedbackPacket.PLI{}}, _metadata) do
Expand Down
9 changes: 2 additions & 7 deletions lib/membrane/rtcp/receiver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ defmodule Membrane.RTCP.Receiver do

require Membrane.Logger

def_input_pad :input, demand_unit: :buffers, caps: :any
def_output_pad :output, caps: :any
def_input_pad :input, caps: :any, demand_mode: :auto
def_output_pad :output, caps: :any, demand_mode: :auto

def_options local_ssrc: [spec: RTP.ssrc_t()],
remote_ssrc: [spec: RTP.ssrc_t()],
Expand Down Expand Up @@ -107,11 +107,6 @@ defmodule Membrane.RTCP.Receiver do
@impl true
def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state)

@impl true
def handle_demand(:output, size, :buffers, _ctx, state) do
{{:ok, demand: {:input, size}}, state}
end

@impl true
def handle_process(:input, buffer, _ctx, state) do
{{:ok, buffer: {:output, buffer}}, state}
Expand Down
11 changes: 3 additions & 8 deletions lib/membrane/rtp/inbound_packet_tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ defmodule Membrane.RTP.InboundPacketTracker do
@max_s24_val Bitwise.bsl(1, 23) - 1
@min_s24_val -Bitwise.bsl(1, 23)

def_input_pad :input, demand_unit: :buffers, caps: :any
def_output_pad :output, caps: :any
def_input_pad :input, caps: :any, demand_mode: :auto
def_output_pad :output, caps: :any, demand_mode: :auto

def_options clock_rate: [
type: :integer,
Expand Down Expand Up @@ -75,11 +75,6 @@ defmodule Membrane.RTP.InboundPacketTracker do
%State{clock_rate: opts.clock_rate, repair_sequence_numbers?: opts.repair_sequence_numbers?}}
end

@impl true
def handle_demand(:output, size, :buffers, _ctx, state) do
{{:ok, demand: {:input, size}}, state}
end

@impl true
def handle_process(:input, buffer, _ctx, %State{cycles: cycles, max_seq: max_seq} = state) do
seq_num = buffer.metadata.rtp.sequence_number
Expand All @@ -100,7 +95,7 @@ defmodule Membrane.RTP.InboundPacketTracker do

# the packets is either too old or too new
delta <= @max_seq_num - @max_unordered ->
{{:ok, redemand: :output}, update_received(state)}
{:ok, update_received(state)}

# packet is old but within dropout threshold
true ->
Expand Down
15 changes: 4 additions & 11 deletions lib/membrane/rtp/jitter_buffer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ defmodule Membrane.RTP.JitterBuffer do

@timestamp_limit Bitwise.bsl(1, 32)

def_output_pad :output,
caps: RTP
def_output_pad :output, caps: RTP, demand_mode: :auto

def_input_pad :input,
caps: RTP,
demand_unit: :buffers
def_input_pad :input, caps: RTP, demand_mode: :auto

@default_latency 200 |> Time.milliseconds()

Expand Down Expand Up @@ -75,10 +72,6 @@ defmodule Membrane.RTP.JitterBuffer do
{:ok, %{state | waiting?: true}}
end

@impl true
def handle_demand(:output, size, :buffers, _ctx, state),
do: {{:ok, demand: {:input, size}}, state}

@impl true
def handle_end_of_stream(:input, _context, %State{store: store} = state) do
{actions, state} =
Expand Down Expand Up @@ -113,7 +106,7 @@ defmodule Membrane.RTP.JitterBuffer do

{:error, :late_packet} ->
warn("Late packet has arrived")
{{:ok, redemand: :output}, state}
{:ok, state}
end
end

Expand Down Expand Up @@ -142,7 +135,7 @@ defmodule Membrane.RTP.JitterBuffer do

state = %{state | store: store} |> set_timer()

{{:ok, actions ++ [redemand: :output]}, state}
{{:ok, actions}, state}
end

@spec set_timer(State.t()) :: State.t()
Expand Down
52 changes: 20 additions & 32 deletions lib/membrane/rtp/outbound_packet_tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,16 @@ defmodule Membrane.RTP.OutboundPacketTracker do
alias Membrane.{Buffer, RTP, Payload, Time}
alias Membrane.RTP.Session.SenderReport

def_input_pad :input,
caps: :any,
demand_unit: :buffers
def_input_pad :input, caps: :any, demand_mode: :auto

def_output_pad :output,
caps: :any
def_output_pad :output, caps: :any, demand_mode: :auto

def_input_pad :rtcp_input,
availability: :on_request,
caps: :any,
demand_unit: :buffers
demand_mode: :auto

def_output_pad :rtcp_output, availability: :on_request, caps: :any
def_output_pad :rtcp_output, availability: :on_request, caps: :any, demand_mode: :auto

def_options ssrc: [spec: RTP.ssrc_t()],
payload_type: [spec: RTP.payload_type_t()],
Expand Down Expand Up @@ -79,16 +76,6 @@ defmodule Membrane.RTP.OutboundPacketTracker do
{:ok, state}
end

@impl true
def handle_demand(:output, size, :buffers, _ctx, state) do
{{:ok, demand: {:input, size}}, state}
end

@impl true
def handle_demand(Pad.ref(:rtcp_output, _id), _size, _type, _ctx, state) do
{:ok, state}
end

@impl true
def handle_pad_added(Pad.ref(:rtcp_input, _id), _ctx, state) do
{:ok, state}
Expand Down Expand Up @@ -138,21 +125,22 @@ defmodule Membrane.RTP.OutboundPacketTracker do
end

@impl true
def handle_other(:send_stats, _ctx, %{rtcp_output_pad: nil} = state) do
{:ok, state}
end

@impl true
def handle_other(:send_stats, _ctx, state) do
stats = get_stats(state)

actions =
%{state.ssrc => stats}
|> SenderReport.generate_report()
|> Enum.map(&Membrane.RTCP.Packet.serialize(&1))
|> Enum.map(&{:buffer, {state.rtcp_output_pad, %Membrane.Buffer{payload: &1}}})

{{:ok, actions ++ [redemand: state.rtcp_output_pad]}, %{state | any_buffer_sent?: false}}
def handle_other(:send_stats, ctx, state) do
%{rtcp_output_pad: rtcp_output} = state

if rtcp_output && not ctx.pads[rtcp_output].end_of_stream? do
stats = get_stats(state)

actions =
%{state.ssrc => stats}
|> SenderReport.generate_report()
|> Enum.map(&Membrane.RTCP.Packet.serialize(&1))
|> Enum.map(&{:buffer, {rtcp_output, %Membrane.Buffer{payload: &1}}})

{{:ok, actions}, %{state | any_buffer_sent?: false}}
else
{:ok, state}
end
end

defp get_stats(%State{any_buffer_sent?: false}), do: :no_stats
Expand Down
22 changes: 7 additions & 15 deletions lib/membrane/rtp/parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Membrane.RTP.Parser do
use Membrane.Filter

alias Membrane.Buffer
alias Membrane.{RTCPEvent, RTP, RemoteStream}
alias Membrane.{RTCPEvent, RTCP, RTP, RemoteStream}

require Membrane.Logger

Expand All @@ -43,10 +43,10 @@ defmodule Membrane.RTP.Parser do
]

def_input_pad :input,
caps: {RemoteStream, type: :packetized, content_format: one_of([nil, RTP])},
demand_unit: :buffers
caps: {RemoteStream, type: :packetized, content_format: one_of([nil, RTP, RTCP])},
demand_mode: :auto

def_output_pad :output, caps: RTP
def_output_pad :output, caps: RTP, demand_mode: :auto

def_output_pad :rtcp_output, mode: :push, caps: :any, availability: :on_request

Expand Down Expand Up @@ -83,11 +83,8 @@ defmodule Membrane.RTP.Parser do
else
:rtcp ->
case state.rtcp_output_pad do
nil ->
{{:ok, redemand: :output}, state}

pad ->
{{:ok, buffer: {pad, buffer}, redemand: :output}, state}
nil -> {:ok, state}
pad -> {{:ok, buffer: {pad, buffer}}, state}
end

{:error, reason} ->
Expand All @@ -97,15 +94,10 @@ defmodule Membrane.RTP.Parser do
Reason: #{inspect(reason)}. Ignoring packet.
""")

{{:ok, redemand: :output}, state}
{:ok, state}
end
end

@impl true
def handle_demand(:output, size, :buffers, _ctx, state) do
{{:ok, demand: {:input, size}}, state}
end

@impl true
def handle_event(:output, %RTCPEvent{} = event, _ctx, state) do
case state.rtcp_output_pad do
Expand Down
12 changes: 5 additions & 7 deletions lib/membrane/rtp/serializer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ defmodule Membrane.RTP.Serializer do
@max_seq_num Bitwise.bsl(1, 16) - 1
@max_timestamp Bitwise.bsl(1, 32) - 1

def_input_pad :input, caps: RTP, demand_unit: :buffers
def_output_pad :output, caps: {RemoteStream, type: :packetized, content_format: RTP}
def_input_pad :input, caps: RTP, demand_mode: :auto

def_output_pad :output,
caps: {RemoteStream, type: :packetized, content_format: RTP},
demand_mode: :auto

def_options ssrc: [spec: RTP.ssrc_t()],
payload_type: [spec: RTP.payload_type_t()],
Expand Down Expand Up @@ -73,11 +76,6 @@ defmodule Membrane.RTP.Serializer do
{{:ok, caps: {:output, caps}}, state}
end

@impl true
def handle_demand(:output, size, :buffers, _ctx, state) do
{{:ok, demand: {:input, size}}, state}
end

@impl true
def handle_process(:input, buffer, _ctx, state) do
{rtp_metadata, metadata} = Map.pop(buffer.metadata, :rtp, %{})
Expand Down
5 changes: 3 additions & 2 deletions lib/membrane/rtp/session_bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ defmodule Membrane.RTP.SessionBin do

@ssrc_boundaries 2..(Bitwise.bsl(1, 32) - 1)

@rtp_input_buffer_params [warn_size: 250, fail_size: 500]
@rtp_input_params [toilet_capacity: 500]

def_options fmt_mapping: [
spec: %{RTP.payload_type_t() => {RTP.encoding_name_t(), RTP.clock_rate_t()}},
Expand Down Expand Up @@ -341,7 +341,8 @@ defmodule Membrane.RTP.SessionBin do

links =
[
link_bin_input(pad, buffer: @rtp_input_buffer_params)
link_bin_input(pad)
|> via_in(:input, @rtp_input_params)
|> to({:rtp_parser, ref}, %RTP.Parser{secure?: secure?})
|> via_in(Pad.ref(:input, ref))
|> to(:ssrc_router)
Expand Down
11 changes: 3 additions & 8 deletions lib/membrane/rtp/silence_discarder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ defmodule Membrane.RTP.SilenceDiscarder do

alias Membrane.RTP.{Header, PacketsDiscardedEvent}

def_input_pad :input, caps: :any, demand_unit: :buffers
def_output_pad :output, caps: :any
def_input_pad :input, caps: :any, demand_mode: :auto
def_output_pad :output, caps: :any, demand_mode: :auto

def_options max_consecutive_drops: [
spec: non_neg_integer() | :infinity,
Expand Down Expand Up @@ -49,11 +49,6 @@ defmodule Membrane.RTP.SilenceDiscarder do
{:ok, Map.from_struct(opts) |> Map.put(:dropped, 0)}
end

@impl true
def handle_demand(:output, size, :buffers, _ctx, state) do
{{:ok, demand: {:input, size}}, state}
end

@impl true
def handle_event(pad, other, ctx, state), do: super(pad, other, ctx, state)

Expand Down Expand Up @@ -83,7 +78,7 @@ defmodule Membrane.RTP.SilenceDiscarder do

cond do
audio_level >= silence_threshold ->
{{:ok, redemand: :output}, %{state | dropped: dropped + 1}}
{:ok, %{state | dropped: dropped + 1}}

dropped > 0 ->
stop_dropping(buffer, state)
Expand Down
30 changes: 4 additions & 26 deletions lib/membrane/rtp/ssrc_router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ defmodule Membrane.RTP.SSRCRouter do

alias Membrane.{RTP, RTCPEvent}

def_input_pad :input, demand_unit: :buffers, caps: RTP, availability: :on_request
def_input_pad :input, caps: RTP, availability: :on_request, demand_mode: :auto

def_input_pad :rtcp_input, demand_unit: :buffers, caps: :any, availability: :on_request
def_input_pad :rtcp_input, caps: :any, availability: :on_request, demand_mode: :auto

def_output_pad :output, caps: RTP, availability: :on_request
def_output_pad :output, caps: RTP, availability: :on_request, demand_mode: :auto

defmodule State do
@moduledoc false
Expand Down Expand Up @@ -86,27 +86,6 @@ defmodule Membrane.RTP.SSRCRouter do
{:ok, %State{state | output_pad_ids: MapSet.delete(state.output_pad_ids, ssrc)}}
end

@impl true
def handle_prepared_to_playing(ctx, state) do
actions =
ctx.pads
|> Enum.filter(fn {_pad_ref, pad_data} -> pad_data.direction == :input end)
|> Enum.map(fn {pad_ref, _pad_data} -> {:demand, {pad_ref, 1}} end)

{{:ok, actions}, state}
end

@impl true
def handle_demand(Pad.ref(:output, ssrc), _size, _unit, ctx, state) do
case state.input_pads do
%{^ssrc => input_pad} ->
{{:ok, demand: {input_pad, &(&1 + ctx.incoming_demand)}}, state}

_pads ->
{:ok, state}
end
end

@impl true
def handle_process(Pad.ref(:input, _id) = pad, buffer, _ctx, state) do
%Membrane.Buffer{
Expand Down Expand Up @@ -189,8 +168,7 @@ defmodule Membrane.RTP.SSRCRouter do

if waiting_for_linking?(ssrc, state) do
state = update_in(state, [:linking_buffers, ssrc], &[action | &1])
actions = if type == :buffer, do: [demand: {state.input_pads[ssrc], &(&1 + 1)}], else: []
{actions, state}
{[], state}
else
{[action], state}
end
Expand Down
Loading

0 comments on commit 47cdf49

Please sign in to comment.