diff --git a/lib/boombox/mp4.ex b/lib/boombox/mp4.ex index db5da1c..b779a56 100644 --- a/lib/boombox/mp4.ex +++ b/lib/boombox/mp4.ex @@ -23,7 +23,6 @@ defmodule Boombox.MP4 do get_child(:mp4_demuxer) |> via_out(Pad.ref(:output, id)) |> child(:mp4_in_aac_parser, Membrane.AAC.Parser) - |> child(:mp4_in_aac_decoder, Membrane.AAC.FDK.Decoder) {:audio, spec} @@ -32,15 +31,22 @@ defmodule Boombox.MP4 do {:video, spec} end) - %Ready{track_builders: track_builders} + track_formats = + Map.new(tracks, fn + {_id, %Membrane.AAC{} = format} -> {:audio, format} + {_id, %Membrane.H264{} = format} -> {:video, format} + end) + + %Ready{track_builders: track_builders, track_formats: track_formats} end @spec link_output( String.t(), Boombox.Pipeline.track_builders(), + Boombox.Pipeline.track_formats(), Membrane.ChildrenSpec.t() ) :: Ready.t() - def link_output(location, track_builders, spec_builder) do + def link_output(location, track_builders, track_formats, spec_builder) do spec = [ spec_builder, @@ -49,7 +55,10 @@ defmodule Boombox.MP4 do Enum.map(track_builders, fn {:audio, builder} -> builder - |> child(:mp4_out_aac_encoder, Membrane.AAC.FDK.Encoder) + |> child(:audio_transcoder, %Boombox.Transcoders.Audio{ + input_stream_format: track_formats.audio, + output_stream_format_module: Membrane.AAC + }) |> child(:mp4_out_aac_parser, %Membrane.AAC.Parser{ out_encapsulation: :none, output_config: :esds diff --git a/lib/boombox/pipeline.ex b/lib/boombox/pipeline.ex index 6b414aa..a0fdd15 100644 --- a/lib/boombox/pipeline.ex +++ b/lib/boombox/pipeline.ex @@ -29,17 +29,23 @@ defmodule Boombox.Pipeline do optional(:video) => Membrane.ChildrenSpec.t() } + @type track_formats :: %{ + optional(:audio) => struct(), + optional(:video) => struct() +} + defmodule Ready do @moduledoc false @type t :: %__MODULE__{ actions: [Membrane.Pipeline.Action.t()], track_builders: Boombox.Pipeline.track_builders() | nil, + track_formats: Boombox.Pipeline.track_formats() | nil, spec_builder: Membrane.ChildrenSpec.t() | nil, eos_info: term } - defstruct actions: [], track_builders: nil, spec_builder: [], eos_info: nil + defstruct actions: [], track_builders: nil, track_formats: nil, spec_builder: [], eos_info: nil end defmodule Wait do @@ -59,6 +65,7 @@ defmodule Boombox.Pipeline do actions_acc: [], spec_builder: [], track_builders: nil, + track_formats: nil, last_result: nil, eos_info: nil ] @@ -87,6 +94,7 @@ defmodule Boombox.Pipeline do actions_acc: [Membrane.Pipeline.Action.t()], spec_builder: Membrane.ChildrenSpec.t(), track_builders: Boombox.Pipeline.track_builders() | nil, + track_formats: Boombox.Pipeline.track_formats() | nil, last_result: Boombox.Pipeline.Ready.t() | Boombox.Pipeline.Wait.t() | nil, eos_info: term() } @@ -126,6 +134,7 @@ defmodule Boombox.Pipeline do Boombox.WebRTC.handle_output_tracks_negotiated( state.track_builders, + state.track_formats, state.spec_builder, tracks ) @@ -207,13 +216,13 @@ defmodule Boombox.Pipeline do ctx, %{ status: :input_ready, - last_result: %Ready{track_builders: track_builders, spec_builder: spec_builder} + last_result: %Ready{} = ready } = state ) when track_builders != nil do - state = %{state | track_builders: track_builders, spec_builder: spec_builder} + state = %{state | track_builders: ready.track_builders, track_formats: ready.track_formats, spec_builder: ready.spec_builder} - link_output(state.output, track_builders, spec_builder, ctx) + link_output(state.output, track_builders, track_formats, spec_builder, ctx) |> do_proceed(:output_linked, :awaiting_output_link, ctx, state) end @@ -278,16 +287,17 @@ defmodule Boombox.Pipeline do @spec link_output( Boombox.output(), track_builders(), + track_formats(), Membrane.ChildrenSpec.t(), Membrane.Pipeline.CallbackContext.t() ) :: Ready.t() | Wait.t() - defp link_output({:webrtc, _signaling}, track_builders, _spec_builder, _ctx) do + defp link_output({:webrtc, _signaling}, track_builders, _track_formats, _spec_builder, _ctx) do Boombox.WebRTC.link_output(track_builders) end - defp link_output({:file, :mp4, location}, track_builders, spec_builder, _ctx) do - Boombox.MP4.link_output(location, track_builders, spec_builder) + defp link_output({:file, :mp4, location}, track_builders, track_formats, spec_builder, _ctx) do + Boombox.MP4.link_output(location, track_builders, track_formats, spec_builder) end defp parse_input(input) when is_binary(input) do diff --git a/lib/boombox/rtmp.ex b/lib/boombox/rtmp.ex index c474a5e..23f43bf 100644 --- a/lib/boombox/rtmp.ex +++ b/lib/boombox/rtmp.ex @@ -51,6 +51,8 @@ defmodule Boombox.RTMP do video: get_child(:rtmp_source) |> via_out(:video) } - %Ready{spec_builder: spec, track_builders: track_builders} + track_formats = %{audio: nil, video: nil} + + %Ready{spec_builder: spec, track_builders: track_builders, track_formats: track_formats} end end diff --git a/lib/boombox/transcoders/audio.ex b/lib/boombox/transcoders/audio.ex new file mode 100644 index 0000000..38be79d --- /dev/null +++ b/lib/boombox/transcoders/audio.ex @@ -0,0 +1,128 @@ +defmodule Boombox.Transcoders.Audio do + @moduledoc false + + use Membrane.Bin + + require Membrane.Logger + + alias Boombox.Transcoders.DataReceiver + alias Membrane.Funnel + + def_input_pad :input, + accepted_format: any_of(Membrane.AAC, Membrane.Opus) + + def_output_pad :output, + accepted_format: any_of(Membrane.AAC, Membrane.Opus) + + @type stream_format :: Membrane.AAC.t() | Membrane.Opus.t() + @type stream_format_module :: Membrane.AAC | Membrane.Opus + + @opus_sample_rate 48_000 + + def_options input_stream_format: [ + spec: stream_format(), + default: nil, + description: """ + Format of the input stream. + + If set to nil, bin will resolve it based on the input stream format coming via the \ + `:input` pad. + """ + ], + output_stream_format_module: [ + spec: stream_format_module(), + description: """ + Format of the output stream. + + Input stream will be transcoded, if it doesn't match the output stream format. + """ + ] + + @impl true + def handle_init(_ctx, opts) do + spec = [ + bin_input() |> child(:data_receiver, DataReceiver), + child(:output_funnel, Funnel) |> bin_output() + ] + + state = + Map.from_struct(opts) + |> Map.put(:input_linked_with_output?, false) + + {link_actions, state} = maybe_link_input_with_output(state) + {[spec: spec] ++ link_actions, state} + end + + @impl true + def handle_child_notification( + {:input_stream_format, stream_format}, + :data_receiver, + _ctx, + state + ) do + %{state | input_stream_format: stream_format} + |> maybe_link_input_with_output() + end + + @impl true + def handle_child_notification(_notification, _element, _ctx, state) do + {[], state} + end + + defp maybe_link_input_with_output(state) + when state.input_linked_with_output? or state.input_stream_format == nil do + {[], state} + end + + defp maybe_link_input_with_output(state) do + spec = + link_input_with_output( + state.input_stream_format, + state.output_stream_format_module + ) + + state = %{state | input_linked_with_output?: true} + {[spec: spec], state} + end + + defp link_input_with_output(%format{}, format) do + Membrane.Logger.debug(""" + This bin will only forward buffers, as the input stream format is the same as the output stream format. + """) + + get_child(:data_receiver) + |> get_child(:output_funnel) + end + + defp link_input_with_output(%Membrane.Opus{}, Membrane.AAC) do + get_child(:data_receiver) + |> child(:opus_decoder, Membrane.Opus.Decoder) + |> child(:aac_encoder, Membrane.AAC.FDK.Encoder) + |> get_child(:output_funnel) + end + + defp link_input_with_output(%Membrane.AAC{sample_rate: @opus_sample_rate}, Membrane.Opus) do + get_child(:data_receiver) + |> child(:aac_decoder, Membrane.AAC.FDK.Decoder) + |> child(:opus_encoder, Membrane.Opus.Encoder) + |> get_child(:output_funnel) + end + + defp link_input_with_output(%Membrane.AAC{} = input_format, Membrane.Opus) do + get_child(:data_receiver) + |> child(:aac_decoder, Membrane.AAC.FDK.Decoder) + |> child(:resampler, %Membrane.FFmpeg.SWResample.Converter{ + output_stream_format: %Membrane.RawAudio{ + sample_format: :s16le, + sample_rate: @opus_sample_rate, + channels: input_format.channels + } + }) + |> child(:opus_encoder, Membrane.Opus.Encoder) + |> get_child(:output_funnel) + end + + defp link_input_with_output(input_format, output_format_module) do + raise "Cannot transcode #{inspect(input_format)} to #{inspect(output_format_module)} yet" + end +end diff --git a/lib/boombox/transcoders/data_receiver.ex b/lib/boombox/transcoders/data_receiver.ex new file mode 100644 index 0000000..2051af4 --- /dev/null +++ b/lib/boombox/transcoders/data_receiver.ex @@ -0,0 +1,100 @@ +defmodule Boombox.Transcoders.DataReceiver do + @moduledoc false + + # An Element that + # - is linked to the input od Transcoder + # - notifies parent (Transcoder) about received stream format + # - buffers incoming data, until output pad is linked + + use Membrane.Filter + + alias Membrane.TimestampQueue + + def_input_pad :input, + accepted_format: any_of(Membrane.AAC, Membrane.Opus) + + def_output_pad :output, + accepted_format: any_of(Membrane.AAC, Membrane.Opus), + availability: :on_request + + defguardp is_output_linked(state) when state.output_pad_ref != nil + + @impl true + def handle_init(_ctx, _opts), do: {[], %{queue: TimestampQueue.new(), output_pad_ref: nil}} + + @impl true + def handle_playing(ctx, state), do: maybe_flush_queue(ctx, state) + + @impl true + def handle_pad_added(output_pad_ref, ctx, state) do + output_pads_number = Map.keys(ctx.pads) |> Enum.count(&(&1 != :input)) + + if output_pads_number > 1 do + raise "#{inspect(__MODULE__)} can have only one output pad, but it has #{output_pads_number}" + end + + state = %{state | output_pad_ref: output_pad_ref} + maybe_flush_queue(ctx, state) + end + + @impl true + def handle_stream_format(:input, stream_format, _ctx, state) when is_output_linked(state) do + actions = [ + notify_parent: {:input_stream_format, stream_format}, + stream_format: {state.output_pad_ref, stream_format} + ] + + {actions, state} + end + + @impl true + def handle_stream_format(:input, stream_format, _ctx, state) do + queue = TimestampQueue.push_stream_format(state.queue, :input, stream_format) + {[notify_parent: {:input_stream_format, stream_format}], %{state | queue: queue}} + end + + @impl true + def handle_buffer(:input, buffer, _ctx, state) when is_output_linked(state) do + {[buffer: {state.output_pad_ref, buffer}], state} + end + + @impl true + def handle_buffer(:input, buffer, _ctx, state) do + {_suggested_actions, queue} = TimestampQueue.push_buffer(state.queue, :input, buffer) + {[], %{state | queue: queue}} + end + + @impl true + def handle_event(_pad, event, _ctx, state) when is_output_linked(state) do + {[forward: event], state} + end + + @impl true + def handle_event(:input, event, _ctx, state) do + queue = TimestampQueue.push_event(state.queue, :input, event) + {[], %{state | queue: queue}} + end + + @impl true + def handle_end_of_stream(:input, _ctx, state) when is_output_linked(state) do + {[end_of_stream: state.output_pad_ref], state} + end + + @impl true + def handle_end_of_stream(:input, _ctx, state) do + queue = TimestampQueue.push_end_of_stream(state.queue, :input) + {[], %{state | queue: queue}} + end + + defp maybe_flush_queue(ctx, state) when ctx.playback == :playing and is_output_linked(state) do + {_suggested_actions, items, queue} = TimestampQueue.flush_and_close(state.queue) + + actions = + Enum.map(items, fn + {:input, {item_type, item}} -> {item_type, {state.output_pad_ref, item}} + {:input, :end_of_stream} -> {:end_of_stream, state.output_pad_ref} + end) + + {actions, %{state | queue: queue}} + end +end diff --git a/lib/boombox/webrtc.ex b/lib/boombox/webrtc.ex index 9d03d9c..8f4e2a8 100644 --- a/lib/boombox/webrtc.ex +++ b/lib/boombox/webrtc.ex @@ -21,12 +21,11 @@ defmodule Boombox.WebRTC do @spec handle_input_tracks(Membrane.WebRTC.Source.new_tracks()) :: Ready.t() def handle_input_tracks(tracks) do track_builders = - Map.new(tracks, fn + Enum.map(tracks, fn %{kind: :audio, id: id} -> spec = get_child(:webrtc_input) |> via_out(Pad.ref(:output, id)) - |> child(:webrtc_in_opus_decoder, Membrane.Opus.Decoder) {:audio, spec} @@ -38,7 +37,9 @@ defmodule Boombox.WebRTC do {:video, spec} end) - %Ready{track_builders: track_builders} + track_formats = Enum.map(tracks, & {&1.kind, nil}) + + %Ready{track_builders: track_builders, track_formats: track_formats} end @spec create_output(Boombox.webrtc_opts()) :: Ready.t() @@ -63,10 +64,11 @@ defmodule Boombox.WebRTC do @spec handle_output_tracks_negotiated( Boombox.Pipeline.track_builders(), + Boombox.Pipeline.track_formats(), Membrane.ChildrenSpec.t(), Membrane.WebRTC.Sink.new_tracks() ) :: Ready.t() - def handle_output_tracks_negotiated(track_builders, spec_builder, tracks) do + def handle_output_tracks_negotiated(track_builders, track_formats, spec_builder, tracks) do tracks = Map.new(tracks, &{&1.kind, &1.id}) spec = [ @@ -74,14 +76,10 @@ defmodule Boombox.WebRTC do Enum.map(track_builders, fn {:audio, builder} -> builder - |> child(:webrtc_out_resampler, %Membrane.FFmpeg.SWResample.Converter{ - output_stream_format: %Membrane.RawAudio{ - sample_format: :s16le, - sample_rate: 48_000, - channels: 2 - } + |> child(:audio_transcoder, %Boombox.Transcoders.Audio{ + input_stream_format: track_formats[:audio], + output_stream_format_module: Membrane.Opus }) - |> child(:webrtc_out_opus_encoder, Membrane.Opus.Encoder) |> child(:webrtc_out_audio_realtimer, Membrane.Realtimer) |> via_in(Pad.ref(:input, tracks.audio), options: [kind: :audio]) |> get_child(:webrtc_output) diff --git a/mix.exs b/mix.exs index aad5ecf..1a24ead 100644 --- a/mix.exs +++ b/mix.exs @@ -47,6 +47,7 @@ defmodule Boombox.Mixfile do {:membrane_realtimer_plugin, "~> 0.9.0"}, {:membrane_rtmp_plugin, github: "membraneframework/membrane_rtmp_plugin"}, {:membrane_ffmpeg_swresample_plugin, "~> 0.20.0"}, + {:membrane_timestamp_queue, "~> 0.2.2"}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:dialyxir, ">= 0.0.0", only: :dev, runtime: false}, {:credo, ">= 0.0.0", only: :dev, runtime: false} diff --git a/mix.lock b/mix.lock index 07dc522..bf1486d 100644 --- a/mix.lock +++ b/mix.lock @@ -62,6 +62,7 @@ "membrane_rtp_plugin": {:hex, :membrane_rtp_plugin, "0.29.0", "0277310eb599b8e6de9e0b864807f23b3b245865e39a28f0cbab695d1f2c157e", [:mix], [{:bimap, "~> 1.2", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_libsrtp, "~> 0.6.0 or ~> 0.7.0", [hex: :ex_libsrtp, repo: "hexpm", optional: true]}, {:heap, "~> 2.0.2", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_funnel_plugin, "~> 0.9.0", [hex: :membrane_funnel_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_telemetry_metrics, "~> 0.1.0", [hex: :membrane_telemetry_metrics, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}], "hexpm", "1b3fd808114e06332b6a4e000238998a9188d1ef625c414ca3239aee70f0775d"}, "membrane_rtp_vp8_plugin": {:hex, :membrane_rtp_vp8_plugin, "0.9.1", "9e8a74d764730a23382ba862a238963c9639b4c6963238caeb6fe2449a66add8", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_vp8_format, "~> 0.4.0", [hex: :membrane_vp8_format, repo: "hexpm", optional: false]}], "hexpm", "704856eb2734bb6ea5cc47242c241de45debb5724a81cffb344bacda9867fe98"}, "membrane_telemetry_metrics": {:hex, :membrane_telemetry_metrics, "0.1.0", "cb93d28356b436b0597736c3e4153738d82d2a14ff547f831df7e9051e54fc06", [:mix], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6.1", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "aba28dc8311f70ced95d984509be930fac55857d2d18bffcf768815e627be3f0"}, + "membrane_timestamp_queue": {:hex, :membrane_timestamp_queue, "0.2.2", "1c831b2273d018a6548654aa9f7fa7c4b683f71d96ffe164934ef55f9d11f693", [:mix], [{:heap, "~> 2.0", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c830e760baaced0988421671cd2c83c7cda8d1bd2b61fd05332711675d1204f"}, "membrane_vp8_format": {:hex, :membrane_vp8_format, "0.4.0", "6c29ec67479edfbab27b11266dc92f18f3baf4421262c5c31af348c33e5b92c7", [:mix], [], "hexpm", "8bb005ede61db8fcb3535a883f32168b251c2dfd1109197c8c3b39ce28ed08e2"}, "membrane_webrtc_plugin": {:hex, :membrane_webrtc_plugin, "0.21.0", "0d47a6ffe3eb18abf43e9f6d089a409120ecd5cff43095d065fbb9e1c038f79c", [:mix], [{:bandit, "~> 1.2", [hex: :bandit, repo: "hexpm", optional: false]}, {:ex_webrtc, "~> 0.3.0", [hex: :ex_webrtc, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_rtp_h264_plugin, "~> 0.19.0", [hex: :membrane_rtp_h264_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_opus_plugin, "~> 0.9.0", [hex: :membrane_rtp_opus_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_plugin, "~> 0.29.0", [hex: :membrane_rtp_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_vp8_plugin, "~> 0.9.1", [hex: :membrane_rtp_vp8_plugin, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.0", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "39d383eadb1b1ce10975ac8505012e901c8961e6f5a65577ff0fbf03b7bc8fc7"}, "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"},