From e579e75c5dd9d634649aa77c1d6f6f06daed4824 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Mon, 12 Aug 2024 17:41:52 +0200 Subject: [PATCH] wip --- lib/boombox/mp4.ex | 21 ++--- lib/boombox/rtmp.ex | 12 ++- lib/boombox/transcoders/audio.ex | 96 ++++++++++++++++------- lib/boombox/transcoders/data_receiver.ex | 98 ++++++++++++++++++++++++ lib/boombox/webrtc.ex | 29 +++---- 5 files changed, 203 insertions(+), 53 deletions(-) create mode 100644 lib/boombox/transcoders/data_receiver.ex diff --git a/lib/boombox/mp4.ex b/lib/boombox/mp4.ex index db5da1c..d55d464 100644 --- a/lib/boombox/mp4.ex +++ b/lib/boombox/mp4.ex @@ -17,19 +17,18 @@ defmodule Boombox.MP4 do @spec handle_input_tracks(Membrane.MP4.Demuxer.ISOM.new_tracks_t()) :: Ready.t() def handle_input_tracks(tracks) do track_builders = - Map.new(tracks, fn - {id, %Membrane.AAC{}} -> + Enum.map(tracks, fn + {id, %Membrane.AAC{} = format} -> spec = get_child(:mp4_demuxer) |> via_out(Pad.ref(:output, id)) |> child(:mp4_in_aac_parser, Membrane.AAC.Parser) - |> child(:mp4_in_aac_decoder, Membrane.AAC.FDK.Decoder) - {:audio, spec} + {:audio, format, spec} - {id, %Membrane.H264{}} -> + {id, %Membrane.H264{} = format} -> spec = get_child(:mp4_demuxer) |> via_out(Pad.ref(:output, id)) - {:video, spec} + {:video, format, spec} end) %Ready{track_builders: track_builders} @@ -47,9 +46,13 @@ defmodule Boombox.MP4 do child(:mp4_muxer, Membrane.MP4.Muxer.ISOM) |> child(:mp4_file_sink, %Membrane.File.Sink{location: location}), Enum.map(track_builders, fn - {:audio, builder} -> + {:audio, format, builder} -> builder - |> child(:mp4_out_aac_encoder, Membrane.AAC.FDK.Encoder) + # |> child(:mp4_out_aac_encoder, Membrane.AAC.FDK.Encoder) + |> child(:audio_transcoder, %Boombox.Transcoders.Audio{ + input_stream_format: format, + output_stream_format: Membrane.AAC + }) |> child(:mp4_out_aac_parser, %Membrane.AAC.Parser{ out_encapsulation: :none, output_config: :esds @@ -57,7 +60,7 @@ defmodule Boombox.MP4 do |> via_in(Pad.ref(:input, :audio)) |> get_child(:mp4_muxer) - {:video, builder} -> + {:video, _format, builder} -> builder |> child(:mp4_out_h264_parser, %Membrane.H264.Parser{output_stream_structure: :avc3}) |> via_in(Pad.ref(:input, :video)) diff --git a/lib/boombox/rtmp.ex b/lib/boombox/rtmp.ex index c474a5e..3c19296 100644 --- a/lib/boombox/rtmp.ex +++ b/lib/boombox/rtmp.ex @@ -46,10 +46,16 @@ defmodule Boombox.RTMP do |> child(:rtmp_in_aac_decoder, Membrane.AAC.FDK.Decoder) ] - track_builders = %{ - audio: get_child(:rtmp_in_aac_decoder), - video: get_child(:rtmp_source) |> via_out(:video) + track_builders = [ + {:audio, nil, get_child(:rtmp_in_aac_decoder)}, + {:video, nil, get_child(:rtmp_source) |> via_out(:video) } + ] + + # track_builders = %{ + # # audio: get_child(:rtmp_in_aac_decoder), + # video: get_child(:rtmp_source) |> via_out(:video) + # } %Ready{spec_builder: spec, track_builders: track_builders} end diff --git a/lib/boombox/transcoders/audio.ex b/lib/boombox/transcoders/audio.ex index d471e52..2267c68 100644 --- a/lib/boombox/transcoders/audio.ex +++ b/lib/boombox/transcoders/audio.ex @@ -33,58 +33,98 @@ defmodule Boombox.Transcoders.Audio do """ ] + @impl true + def handle_init(_ctx, %{input_stream_format: nil} = opts) do + spec = [ + bin_input() |> child(:data_receiver, DataReceiver), + child(:output_forward_filter, Funnel) |> bin_output() + ] + + state = + Map.from_struct(opts) + |> Map.put(:spec_generated?, false) + + {[spec: spec], state} + end + @impl true def handle_init(_ctx, opts) do - spec = generate_transcoding_spec(opts.input_stream_format, opts.output_stream_format) - {[spec: spec], Map.from_struct(opts)} + spec = + bin_input() + |> generate_transcoding_spec(opts.input_stream_format, opts.output_stream_format) + |> bin_output() + + state = + Map.from_struct(opts) + |> Map.put(:spec_generated?, true) + + {[spec: spec], state} end - defp generate_transcoding_spec(_format, _format) do + @impl true + def handle_child_notification( + {:input_stream_format, stream_format}, + :data_receiver, + _ctx, + %{spec_generated?: false} = state + ) do + spec = + get_child(:data_receiver) + |> generate_transcoding_spec(stream_format, state.output_stream_format) + |> get_child(:output_forward_filter) + + state = %{state | input_stream_format: stream_format, spec_generated?: true} + {[spec: spec], state} + end + + @impl true + def handle_child_notification(_notification, _element, _ctx, state) do + {[], state} + end + + defp generate_transcoding_spec(input_spec_builder, input_format, output_format) when output_format in [input_format, input_format.__struct__] do Membrane.Logger.debug(""" This bin will only forward buffers, as the input stream format is the same as the output stream format. """) - bin_input() + input_spec_builder |> child(:forwarding_filter, Membrane.Debug.Filter) - |> bin_output() end defp generate_transcoding_spec( + input_spec_builder, %Membrane.AAC{channels: channels} = input_format, - %Membrane.Opus{channels: channels} = output_format + Membrane.Opus ) do decoding_spec = - bin_input() + input_spec_builder |> child(:aac_decoder, Membrane.AAC.FDK.Decoder) - resampling_spec = - if input_format.sample_rate == @opus_sample_rate do - decoding_spec - else - decoding_spec - |> child(:resampler, %Membrane.FFmpeg.SWResample.Converter{ - output_stream_format: %Membrane.RawAudio{ - sample_format: :s16le, - sample_rate: @opus_sample_rate, - channels: channels - } - }) - end - - resampling_spec - |> child(:opus_encoder, Membrane.Opus.Encoder) + if input_format.sample_rate == @opus_sample_rate do + decoding_spec + else + decoding_spec + |> child(:resampler, %Membrane.FFmpeg.SWResample.Converter{ + output_stream_format: %Membrane.RawAudio{ + sample_format: :s16le, + sample_rate: @opus_sample_rate, + channels: channels + } + }) + end end defp generate_transcoding_spec( - %Membrane.Opus{channels: channels} = input_format, - %Membrane.AAC{channels: channels} = output_format + input_spec_builder, + %Membrane.Opus{}, + Membrane.AAC ) do - bin_input() + input_spec_builder |> child(:opus_decoder, Membrane.Opus.Decoder) |> child(:aac_encoder, Membrane.AAC.FDK.Encoder) end - defp generate_transcoding_spec(input_format, output_format) do - raise "Cannot transform #{inspect(input_format)} to #{input_format(output_format)} yet" + defp generate_transcoding_spec(_input_spec_builder, input_format, output_format) do + raise "Cannot transform #{inspect(input_format)} to #{inspect(output_format)} yet" end end diff --git a/lib/boombox/transcoders/data_receiver.ex b/lib/boombox/transcoders/data_receiver.ex new file mode 100644 index 0000000..15f02f5 --- /dev/null +++ b/lib/boombox/transcoders/data_receiver.ex @@ -0,0 +1,98 @@ +defmodule Boombox.Transcoders.DataReceiver do + @moduledoc false + + # An Element that + # - is linked to the input od Transcoder + # - notifies parent (Transcoder) about received stream format + # - buffers incoming data, until output pad is linked + + use Membrane.Filter + + alias Membrane.TimestampQueue + + def_input_pad :input, + accepted_format: any_of(Membrane.AAC, Membrane.Opus) + + def_output_pad :output, + accepted_format: any_of(Membrane.AAC, Membrane.Opus), + availability: :on_request + + defguardp is_output_linked(state) when state.output_pad_ref != nil + + @impl true + def handle_init(_ctx, _opts), do: {[], %{queue: TimestampQueue.new(), output_pad_ref: nil}} + + @impl true + def handle_playing(ctx, state), do: maybe_flush_queue(ctx, state) + + @impl true + def handle_pad_added(output_pad_ref, ctx, state) do + output_pads_number = Map.keys(ctx.pads) |> Enum.count(&(&1 != :input)) + + if output_pads_number > 1 do + raise "#{inspect(__MODULE__)} can have only one output pad, but it has #{output_pads_number}" + end + + state = %{state | output_pad_ref: output_pad_ref} + maybe_flush_queue(ctx, state) + end + + @impl true + def handle_stream_format(:input, stream_format, _ctx, state) do + {actions, state} = + if is_output_linked(state) do + {[stream_format: {state.output_pad_ref, stream_format}], state} + else + queue = TimestampQueue.push_stream_format(state.queue, :input, stream_format) + {[], %{state | queue: queue}} + end + + actions = actions ++ [notify_parent: {:input_stream_format, stream_format}] + {actions, state} + end + + @impl true + def handle_buffer(:input, buffer, _ctx, state) when is_output_linked(state) do + {[buffer: {state.output_pad_ref, buffer}], state} + end + + @impl true + def handle_buffer(:input, buffer, _ctx, state) do + {_suggested_actions, queue} = TimestampQueue.push_buffer(state.queue, :input, buffer) + {[], %{state | queue: queue}} + end + + @impl true + def handle_event(_pad, event, _ctx, state) when is_output_linked(state) do + {[forward: event], state} + end + + @impl true + def handle_event(:input, event, _ctx, state) do + queue = TimestampQueue.push_event(state.queue, :input, event) + {[], %{state | queue: queue}} + end + + @impl true + def handle_end_of_stream(:input, _ctx, state) when is_output_linked(state) do + {[end_of_stream: state.output_pad_ref], state} + end + + @impl true + def handle_end_of_stream(:input, _ctx, state) do + queue = TimestampQueue.push_end_of_stream(state.queue, :input) + {[], %{state | queue: queue}} + end + + defp maybe_flush_queue(ctx, state) when ctx.playback == :playing and is_output_linked(state) do + {_suggested_actions, items, queue} = TimestampQueue.flush_and_close(state.queue) + + actions = + Enum.map(items, fn + {:input, {item_type, item}} -> {item_type, {state.output_pad_ref, item}} + {:input, :end_of_stream} -> {:end_of_stream, state.output_pad_ref} + end) + + {actions, %{state | queue: queue}} + end +end diff --git a/lib/boombox/webrtc.ex b/lib/boombox/webrtc.ex index 9d03d9c..6b1aacd 100644 --- a/lib/boombox/webrtc.ex +++ b/lib/boombox/webrtc.ex @@ -21,21 +21,20 @@ defmodule Boombox.WebRTC do @spec handle_input_tracks(Membrane.WebRTC.Source.new_tracks()) :: Ready.t() def handle_input_tracks(tracks) do track_builders = - Map.new(tracks, fn + Enum.map(tracks, fn %{kind: :audio, id: id} -> spec = get_child(:webrtc_input) |> via_out(Pad.ref(:output, id)) - |> child(:webrtc_in_opus_decoder, Membrane.Opus.Decoder) - {:audio, spec} + {:audio, nil, spec} %{kind: :video, id: id} -> spec = get_child(:webrtc_input) |> via_out(Pad.ref(:output, id)) - {:video, spec} + {:video, nil, spec} end) %Ready{track_builders: track_builders} @@ -72,21 +71,25 @@ defmodule Boombox.WebRTC do spec = [ spec_builder, Enum.map(track_builders, fn - {:audio, builder} -> + {:audio, format, builder} -> builder - |> child(:webrtc_out_resampler, %Membrane.FFmpeg.SWResample.Converter{ - output_stream_format: %Membrane.RawAudio{ - sample_format: :s16le, - sample_rate: 48_000, - channels: 2 - } + # |> child(:webrtc_out_resampler, %Membrane.FFmpeg.SWResample.Converter{ + # output_stream_format: %Membrane.RawAudio{ + # sample_format: :s16le, + # sample_rate: 48_000, + # channels: 2 + # } + # }) + # |> child(:webrtc_out_opus_encoder, Membrane.Opus.Encoder) + |> child(:audio_transcoder, %Boombox.Transcoders.Audio{ + input_stream_format: format, + output_stream_format: Membrane.Opus }) - |> child(:webrtc_out_opus_encoder, Membrane.Opus.Encoder) |> child(:webrtc_out_audio_realtimer, Membrane.Realtimer) |> via_in(Pad.ref(:input, tracks.audio), options: [kind: :audio]) |> get_child(:webrtc_output) - {:video, builder} -> + {:video, _format, builder} -> builder |> child(:webrtc_out_video_realtimer, Membrane.Realtimer) |> child(:webrtc_out_h264_parser, %Membrane.H264.Parser{