Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement transcoding #21

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions lib/boombox/mp4.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ defmodule Boombox.MP4 do
get_child(:mp4_demuxer)
|> via_out(Pad.ref(:output, id))
|> child(:mp4_in_aac_parser, Membrane.AAC.Parser)
|> child(:mp4_in_aac_decoder, Membrane.AAC.FDK.Decoder)

{:audio, spec}

Expand All @@ -32,15 +31,22 @@ defmodule Boombox.MP4 do
{:video, spec}
end)

%Ready{track_builders: track_builders}
track_formats =
Map.new(tracks, fn
{_id, %Membrane.AAC{} = format} -> {:audio, format}
{_id, %Membrane.H264{} = format} -> {:video, format}
end)

%Ready{track_builders: track_builders, track_formats: track_formats}
end

@spec link_output(
String.t(),
Boombox.Pipeline.track_builders(),
Boombox.Pipeline.track_formats(),
Membrane.ChildrenSpec.t()
) :: Ready.t()
def link_output(location, track_builders, spec_builder) do
def link_output(location, track_builders, track_formats, spec_builder) do
spec =
[
spec_builder,
Expand All @@ -49,7 +55,10 @@ defmodule Boombox.MP4 do
Enum.map(track_builders, fn
{:audio, builder} ->
builder
|> child(:mp4_out_aac_encoder, Membrane.AAC.FDK.Encoder)
|> child(:audio_transcoder, %Boombox.Transcoders.Audio{
input_stream_format: track_formats.audio,
output_stream_format_module: Membrane.AAC
})
|> child(:mp4_out_aac_parser, %Membrane.AAC.Parser{
out_encapsulation: :none,
output_config: :esds
Expand Down
24 changes: 17 additions & 7 deletions lib/boombox/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,23 @@ defmodule Boombox.Pipeline do
optional(:video) => Membrane.ChildrenSpec.t()
}

@type track_formats :: %{
optional(:audio) => struct(),
optional(:video) => struct()
}

defmodule Ready do
@moduledoc false

@type t :: %__MODULE__{
actions: [Membrane.Pipeline.Action.t()],
track_builders: Boombox.Pipeline.track_builders() | nil,
track_formats: Boombox.Pipeline.track_formats() | nil,
spec_builder: Membrane.ChildrenSpec.t() | nil,
eos_info: term
}

defstruct actions: [], track_builders: nil, spec_builder: [], eos_info: nil
defstruct actions: [], track_builders: nil, track_formats: nil, spec_builder: [], eos_info: nil
end

defmodule Wait do
Expand All @@ -59,6 +65,7 @@ defmodule Boombox.Pipeline do
actions_acc: [],
spec_builder: [],
track_builders: nil,
track_formats: nil,
last_result: nil,
eos_info: nil
]
Expand Down Expand Up @@ -87,6 +94,7 @@ defmodule Boombox.Pipeline do
actions_acc: [Membrane.Pipeline.Action.t()],
spec_builder: Membrane.ChildrenSpec.t(),
track_builders: Boombox.Pipeline.track_builders() | nil,
track_formats: Boombox.Pipeline.track_formats() | nil,
last_result: Boombox.Pipeline.Ready.t() | Boombox.Pipeline.Wait.t() | nil,
eos_info: term()
}
Expand Down Expand Up @@ -126,6 +134,7 @@ defmodule Boombox.Pipeline do

Boombox.WebRTC.handle_output_tracks_negotiated(
state.track_builders,
state.track_formats,
state.spec_builder,
tracks
)
Expand Down Expand Up @@ -207,13 +216,13 @@ defmodule Boombox.Pipeline do
ctx,
%{
status: :input_ready,
last_result: %Ready{track_builders: track_builders, spec_builder: spec_builder}
last_result: %Ready{} = ready
} = state
)
when track_builders != nil do
state = %{state | track_builders: track_builders, spec_builder: spec_builder}
state = %{state | track_builders: ready.track_builders, track_formats: ready.track_formats, spec_builder: ready.spec_builder}

link_output(state.output, track_builders, spec_builder, ctx)
link_output(state.output, track_builders, track_formats, spec_builder, ctx)
|> do_proceed(:output_linked, :awaiting_output_link, ctx, state)
end

Expand Down Expand Up @@ -278,16 +287,17 @@ defmodule Boombox.Pipeline do
@spec link_output(
Boombox.output(),
track_builders(),
track_formats(),
Membrane.ChildrenSpec.t(),
Membrane.Pipeline.CallbackContext.t()
) ::
Ready.t() | Wait.t()
defp link_output({:webrtc, _signaling}, track_builders, _spec_builder, _ctx) do
defp link_output({:webrtc, _signaling}, track_builders, _track_formats, _spec_builder, _ctx) do
Boombox.WebRTC.link_output(track_builders)
end

defp link_output({:file, :mp4, location}, track_builders, spec_builder, _ctx) do
Boombox.MP4.link_output(location, track_builders, spec_builder)
defp link_output({:file, :mp4, location}, track_builders, track_formats, spec_builder, _ctx) do
Boombox.MP4.link_output(location, track_builders, track_formats, spec_builder)
end

defp parse_input(input) when is_binary(input) do
Expand Down
4 changes: 3 additions & 1 deletion lib/boombox/rtmp.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ defmodule Boombox.RTMP do
video: get_child(:rtmp_source) |> via_out(:video)
}

%Ready{spec_builder: spec, track_builders: track_builders}
track_formats = %{audio: nil, video: nil}

%Ready{spec_builder: spec, track_builders: track_builders, track_formats: track_formats}
end
end
128 changes: 128 additions & 0 deletions lib/boombox/transcoders/audio.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
defmodule Boombox.Transcoders.Audio do
@moduledoc false

use Membrane.Bin

require Membrane.Logger

alias Boombox.Transcoders.DataReceiver
alias Membrane.Funnel

def_input_pad :input,
accepted_format: any_of(Membrane.AAC, Membrane.Opus)

def_output_pad :output,
accepted_format: any_of(Membrane.AAC, Membrane.Opus)

@type stream_format :: Membrane.AAC.t() | Membrane.Opus.t()
@type stream_format_module :: Membrane.AAC | Membrane.Opus

@opus_sample_rate 48_000

def_options input_stream_format: [
spec: stream_format(),
default: nil,
description: """
Format of the input stream.

If set to nil, bin will resolve it based on the input stream format coming via the \
`:input` pad.
"""
],
output_stream_format_module: [
spec: stream_format_module(),
description: """
Format of the output stream.

Input stream will be transcoded, if it doesn't match the output stream format.
"""
]

@impl true
def handle_init(_ctx, opts) do
spec = [
bin_input() |> child(:data_receiver, DataReceiver),
child(:output_funnel, Funnel) |> bin_output()
]

state =
Map.from_struct(opts)
|> Map.put(:input_linked_with_output?, false)

{link_actions, state} = maybe_link_input_with_output(state)
{[spec: spec] ++ link_actions, state}
end

@impl true
def handle_child_notification(
{:input_stream_format, stream_format},
:data_receiver,
_ctx,
state
) do
%{state | input_stream_format: stream_format}
|> maybe_link_input_with_output()
end

@impl true
def handle_child_notification(_notification, _element, _ctx, state) do
{[], state}
end

defp maybe_link_input_with_output(state)
when state.input_linked_with_output? or state.input_stream_format == nil do
{[], state}
end

defp maybe_link_input_with_output(state) do
spec =
link_input_with_output(
state.input_stream_format,
state.output_stream_format_module
)

state = %{state | input_linked_with_output?: true}
{[spec: spec], state}
end

defp link_input_with_output(%format{}, format) do
Membrane.Logger.debug("""
This bin will only forward buffers, as the input stream format is the same as the output stream format.
""")

get_child(:data_receiver)
|> get_child(:output_funnel)
end

defp link_input_with_output(%Membrane.Opus{}, Membrane.AAC) do
get_child(:data_receiver)
|> child(:opus_decoder, Membrane.Opus.Decoder)
|> child(:aac_encoder, Membrane.AAC.FDK.Encoder)
|> get_child(:output_funnel)
end

defp link_input_with_output(%Membrane.AAC{sample_rate: @opus_sample_rate}, Membrane.Opus) do
get_child(:data_receiver)
|> child(:aac_decoder, Membrane.AAC.FDK.Decoder)
|> child(:opus_encoder, Membrane.Opus.Encoder)
|> get_child(:output_funnel)
end

defp link_input_with_output(%Membrane.AAC{} = input_format, Membrane.Opus) do
get_child(:data_receiver)
|> child(:aac_decoder, Membrane.AAC.FDK.Decoder)
|> child(:resampler, %Membrane.FFmpeg.SWResample.Converter{
output_stream_format: %Membrane.RawAudio{
sample_format: :s16le,
sample_rate: @opus_sample_rate,
channels: input_format.channels
}
})
|> child(:opus_encoder, Membrane.Opus.Encoder)
|> get_child(:output_funnel)
end

defp link_input_with_output(input_format, output_format_module) do
raise "Cannot transcode #{inspect(input_format)} to #{inspect(output_format_module)} yet"
end
end
100 changes: 100 additions & 0 deletions lib/boombox/transcoders/data_receiver.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
defmodule Boombox.Transcoders.DataReceiver do
@moduledoc false

# An Element that
# - is linked to the input od Transcoder
# - notifies parent (Transcoder) about received stream format
# - buffers incoming data, until output pad is linked

use Membrane.Filter

alias Membrane.TimestampQueue

def_input_pad :input,
accepted_format: any_of(Membrane.AAC, Membrane.Opus)

def_output_pad :output,
accepted_format: any_of(Membrane.AAC, Membrane.Opus),
availability: :on_request

defguardp is_output_linked(state) when state.output_pad_ref != nil

@impl true
def handle_init(_ctx, _opts), do: {[], %{queue: TimestampQueue.new(), output_pad_ref: nil}}

@impl true
def handle_playing(ctx, state), do: maybe_flush_queue(ctx, state)

@impl true
def handle_pad_added(output_pad_ref, ctx, state) do
output_pads_number = Map.keys(ctx.pads) |> Enum.count(&(&1 != :input))

if output_pads_number > 1 do
raise "#{inspect(__MODULE__)} can have only one output pad, but it has #{output_pads_number}"
end

state = %{state | output_pad_ref: output_pad_ref}
maybe_flush_queue(ctx, state)
end

@impl true
def handle_stream_format(:input, stream_format, _ctx, state) when is_output_linked(state) do
actions = [
notify_parent: {:input_stream_format, stream_format},
stream_format: {state.output_pad_ref, stream_format}
]

{actions, state}
end

@impl true
def handle_stream_format(:input, stream_format, _ctx, state) do
queue = TimestampQueue.push_stream_format(state.queue, :input, stream_format)
{[notify_parent: {:input_stream_format, stream_format}], %{state | queue: queue}}
end

@impl true
def handle_buffer(:input, buffer, _ctx, state) when is_output_linked(state) do
{[buffer: {state.output_pad_ref, buffer}], state}
end

@impl true
def handle_buffer(:input, buffer, _ctx, state) do
{_suggested_actions, queue} = TimestampQueue.push_buffer(state.queue, :input, buffer)
{[], %{state | queue: queue}}
end

@impl true
def handle_event(_pad, event, _ctx, state) when is_output_linked(state) do
{[forward: event], state}
end

@impl true
def handle_event(:input, event, _ctx, state) do
queue = TimestampQueue.push_event(state.queue, :input, event)
{[], %{state | queue: queue}}
end

@impl true
def handle_end_of_stream(:input, _ctx, state) when is_output_linked(state) do
{[end_of_stream: state.output_pad_ref], state}
end

@impl true
def handle_end_of_stream(:input, _ctx, state) do
queue = TimestampQueue.push_end_of_stream(state.queue, :input)
{[], %{state | queue: queue}}
end

defp maybe_flush_queue(ctx, state) when ctx.playback == :playing and is_output_linked(state) do
{_suggested_actions, items, queue} = TimestampQueue.flush_and_close(state.queue)

actions =
Enum.map(items, fn
{:input, {item_type, item}} -> {item_type, {state.output_pad_ref, item}}
{:input, :end_of_stream} -> {:end_of_stream, state.output_pad_ref}
end)

{actions, %{state | queue: queue}}
end
end
Loading