diff --git a/.circleci/config.yml b/.circleci/config.yml index c077fb4..04ae00e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -7,16 +7,20 @@ workflows: build: jobs: - elixir/build_test: + cache-version: 6 filters: &filters tags: only: /v.*/ - elixir/test: + cache-version: 6 filters: <<: *filters - elixir/lint: + cache-version: 6 filters: <<: *filters - elixir/hex_publish: + cache-version: 6 requires: - elixir/build_test - elixir/test diff --git a/.github/workflows/fetch_changes.yml b/.github/workflows/fetch_changes.yml index 7c9c0e9..244ede4 100644 --- a/.github/workflows/fetch_changes.yml +++ b/.github/workflows/fetch_changes.yml @@ -32,7 +32,7 @@ jobs: # Runs a set of commands using the runners shell - name: Add remote run: | - git remote add source git@github.com:membraneframework/membrane_template_plugin.git + git remote add source git@github.com:membraneframework/boombox.git git remote update echo "CURRENT_BRANCH=$(git branch --show-current)" >> $GITHUB_ENV diff --git a/README.md b/README.md index e057e4c..818aa00 100644 --- a/README.md +++ b/README.md @@ -1,35 +1,31 @@ -# Membrane Template Plugin +# Boombox -[![Hex.pm](https://img.shields.io/hexpm/v/membrane_template_plugin.svg)](https://hex.pm/packages/membrane_template_plugin) -[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](https://hexdocs.pm/membrane_template_plugin) -[![CircleCI](https://circleci.com/gh/membraneframework/membrane_template_plugin.svg?style=svg)](https://circleci.com/gh/membraneframework/membrane_template_plugin) +[![Hex.pm](https://img.shields.io/hexpm/v/boombox.svg)](https://hex.pm/packages/boombox) +[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](https://hexdocs.pm/boombox) +[![CircleCI](https://circleci.com/gh/membraneframework/boombox.svg?style=svg)](https://circleci.com/gh/membraneframework/boombox) -This repository contains a template for new plugins. - -Check out different branches for other flavors of this template. - -It's a part of the [Membrane Framework](https://membrane.stream). +Boombox is a powerful tool for audio & video streaming based on the [Membrane Framework](https://membrane.stream). ## Installation -The package can be installed by adding `membrane_template_plugin` to your list of dependencies in `mix.exs`: +The package can be installed by adding `boombox` to your list of dependencies in `mix.exs`: ```elixir def deps do [ - {:membrane_template_plugin, "~> 0.1.0"} + {:boombox, "~> 0.1.0"} ] end ``` ## Usage -TODO +See `examples.livemd` for usage examples. ## Copyright and License -Copyright 2020, [Software Mansion](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane_template_plugin) +Copyright 2020, [Software Mansion](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=boombox) -[![Software Mansion](https://logo.swmansion.com/logo?color=white&variant=desktop&width=200&tag=membrane-github)](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane_template_plugin) +[![Software Mansion](https://logo.swmansion.com/logo?color=white&variant=desktop&width=200&tag=membrane-github)](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=boombox) Licensed under the [Apache License, Version 2.0](LICENSE) diff --git a/examples.livemd b/examples.livemd new file mode 100644 index 0000000..7201126 --- /dev/null +++ b/examples.livemd @@ -0,0 +1,126 @@ +# Boombox examples + +```elixir +Logger.configure(level: :info) + +# For ffmpeg and ffplay commands to work on Mac Livebook Desktop +System.put_env("PATH", "/opt/homebrew/bin:#{System.get_env("PATH")}") + +Mix.install([{:boombox, path: __DIR__}, :kino]) +``` + +## Boombox + +```elixir +:ok = :inets.start() + +{:ok, _server} = + :inets.start(:httpd, + bind_address: ~c"localhost", + port: 1234, + document_root: ~c"#{__DIR__}/examples_assets/", + server_name: ~c"assets_server", + server_root: "/tmp", + erl_script_nocache: true + ) +``` + +```elixir +bbb_mp4 = "#{__DIR__}/test/fixtures/bun10s.mp4" +out_dir = "#{__DIR__}/examples_outputs" +``` + + + +## MP4 to WebRTC + +To receive the stream, visit http://localhost:1234/stream_to_browser/index.html after running the cell below + +```elixir +Boombox.run(input: bbb_mp4, output: {:webrtc, "ws://localhost:8830"}) +``` + + + +## WebRTC to MP4 + +To send the stream, visit http://localhost:1234/stream_from_browser/index.html + +```elixir +Boombox.run(input: {:webrtc, "ws://localhost:8829"}, output: "#{out_dir}/webrtc_to_mp4.mp4") +``` + +```elixir +System.shell("ffplay #{out_dir}/webrtc_to_mp4.mp4") +``` + + + +## WebRTC to WebRTC + +Visit http://localhost:1234/stream_from_browser/index.html to send the stream and http://localhost:1234/stream_to_browser/index.html to receive it + +```elixir +Boombox.run(input: {:webrtc, "ws://localhost:8829"}, output: {:webrtc, "ws://localhost:8830"}) +``` + + + +## RTMP to MP4 + +```elixir +uri = "rtmp://localhost:5432" + +t = + Task.async(fn -> + Boombox.run(input: uri, output: "#{out_dir}/rtmp_to_mp4.mp4") + end) + +{_output, 0} = System.shell("ffmpeg -re -i #{bbb_mp4} -c copy -f flv #{uri}") + +Task.await(t) +``` + +```elixir +System.shell("ffplay #{out_dir}/rtmp_to_mp4.mp4") +``` + + + +## RTMP to WebRTC + +To receive the stream, visit http://localhost:1234/stream_to_browser/index.html + +```elixir +uri = "rtmp://localhost:5432" + +t = + Task.async(fn -> + Boombox.run(input: uri, output: {:webrtc, "ws://localhost:8830"}) + end) + +{_output, 0} = System.shell("ffmpeg -re -i #{bbb_mp4} -c copy -f flv #{uri}") + +Task.await(t) +``` + + + +## MP4 via WebRTC to MP4 + +```elixir +signaling = Membrane.WebRTC.SignalingChannel.new() + +t = + Task.async(fn -> + Boombox.run(input: bbb_mp4, output: {:webrtc, signaling}) + end) + +Boombox.run(input: {:webrtc, signaling}, output: "#{out_dir}/mp4_webrtc_mp4.mp4") + +Task.await(t) +``` + +```elixir +System.shell("ffplay #{out_dir}/mp4_webrtc_mp4.mp4") +``` diff --git a/examples_assets/stream_from_browser/index.html b/examples_assets/stream_from_browser/index.html new file mode 100644 index 0000000..b75acf4 --- /dev/null +++ b/examples_assets/stream_from_browser/index.html @@ -0,0 +1,20 @@ + + + + + + + + Membrane WebRTC browser to file example + + + +
+

Membrane WebRTC browser to file example

+
Connecting
+
+ + + + \ No newline at end of file diff --git a/examples_assets/stream_from_browser/stream_from_browser.js b/examples_assets/stream_from_browser/stream_from_browser.js new file mode 100644 index 0000000..89c6720 --- /dev/null +++ b/examples_assets/stream_from_browser/stream_from_browser.js @@ -0,0 +1,58 @@ +const pcConfig = { 'iceServers': [{ 'urls': 'stun:stun.l.google.com:19302' },] }; +const mediaConstraints = { video: true, audio: true } + +const ws = new WebSocket(`ws://localhost:8829`); +const connStatus = document.getElementById("status"); +ws.onopen = _ => start_connection(ws); +ws.onclose = event => { + connStatus.innerHTML = "Disconnected" + console.log("WebSocket connection was terminated:", event); +} + +const start_connection = async (ws) => { + const localStream = await navigator.mediaDevices.getUserMedia(mediaConstraints); + const pc = new RTCPeerConnection(pcConfig); + + pc.onicecandidate = event => { + if (event.candidate === null) return; + console.log("Sent ICE candidate:", event.candidate); + ws.send(JSON.stringify({ type: "ice_candidate", data: event.candidate })); + }; + + pc.onconnectionstatechange = () => { + if (pc.connectionState == "connected") { + const button = document.createElement('button'); + button.innerHTML = "Disconnect"; + button.onclick = () => { + ws.close(); + localStream.getTracks().forEach(track => track.stop()) + } + connStatus.innerHTML = "Connected "; + connStatus.appendChild(button); + } + } + + for (const track of localStream.getTracks()) { + pc.addTrack(track, localStream); + } + + ws.onmessage = async event => { + const { type, data } = JSON.parse(event.data); + + switch (type) { + case "sdp_answer": + console.log("Received SDP answer:", data); + await pc.setRemoteDescription(data); + break; + case "ice_candidate": + console.log("Recieved ICE candidate:", data); + await pc.addIceCandidate(data); + break; + } + }; + + const offer = await pc.createOffer(); + await pc.setLocalDescription(offer); + console.log("Sent SDP offer:", offer) + ws.send(JSON.stringify({ type: "sdp_offer", data: offer })); +}; diff --git a/examples_assets/stream_to_browser/index.html b/examples_assets/stream_to_browser/index.html new file mode 100644 index 0000000..fb51384 --- /dev/null +++ b/examples_assets/stream_to_browser/index.html @@ -0,0 +1,20 @@ + + + + + + + + Membrane WebRTC file to browser example + + + +
+

Membrane WebRTC file to browser example

+ +
+ + + + \ No newline at end of file diff --git a/examples_assets/stream_to_browser/stream_to_browser.js b/examples_assets/stream_to_browser/stream_to_browser.js new file mode 100644 index 0000000..976b112 --- /dev/null +++ b/examples_assets/stream_to_browser/stream_to_browser.js @@ -0,0 +1,37 @@ +const videoPlayer = document.getElementById("videoPlayer"); +const pcConfig = { 'iceServers': [{ 'urls': 'stun:stun.l.google.com:19302' },] }; +const proto = window.location.protocol === "https:" ? "wss:" : "ws:" +const ws = new WebSocket(`${proto}//localhost:8830`); +ws.onopen = () => start_connection(ws); +ws.onclose = event => console.log("WebSocket connection was terminated:", event); + +const start_connection = async (ws) => { + videoPlayer.srcObject = new MediaStream(); + + const pc = new RTCPeerConnection(pcConfig); + pc.ontrack = event => videoPlayer.srcObject.addTrack(event.track); + pc.onicecandidate = event => { + if (event.candidate === null) return; + + console.log("Sent ICE candidate:", event.candidate); + ws.send(JSON.stringify({ type: "ice_candidate", data: event.candidate })); + }; + + ws.onmessage = async event => { + const { type, data } = JSON.parse(event.data); + + switch (type) { + case "sdp_offer": + console.log("Received SDP offer:", data); + await pc.setRemoteDescription(data); + const answer = await pc.createAnswer(); + await pc.setLocalDescription(answer); + ws.send(JSON.stringify({ type: "sdp_answer", data: answer })); + console.log("Sent SDP answer:", answer) + break; + case "ice_candidate": + console.log("Recieved ICE candidate:", data); + await pc.addIceCandidate(data); + } + }; +}; \ No newline at end of file diff --git a/examples_outputs/.gitignore b/examples_outputs/.gitignore new file mode 100644 index 0000000..c3d07e5 --- /dev/null +++ b/examples_outputs/.gitignore @@ -0,0 +1,3 @@ +* +!.gitkeep +!.gitignore diff --git a/examples_outputs/.gitkeep b/examples_outputs/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/lib/boombox.ex b/lib/boombox.ex new file mode 100644 index 0000000..3028141 --- /dev/null +++ b/lib/boombox.ex @@ -0,0 +1,26 @@ +defmodule Boombox do + @moduledoc """ + Boombox is a tool for audio and video streaming. + + See `t:input/0` and `t:output/0` for supported protocols. + """ + @type webrtc_opts :: Membrane.WebRTC.SignalingChannel.t() | URI.t() + + @type input :: + URI.t() + | Path.t() + | {:file, :mp4, Path.t()} + | {:webrtc, webrtc_opts()} + | {:rtmp, URI.t()} + @type output :: URI.t() | Path.t() | {:file, :mp4, Path.t()} | {:webrtc, webrtc_opts()} + + @spec run(input: input, output: output) :: :ok + def run(opts) do + {:ok, supervisor, _pipeline} = Membrane.Pipeline.start_link(Boombox.Pipeline, opts) + Process.monitor(supervisor) + + receive do + {:DOWN, _monitor, :process, ^supervisor, _reason} -> :ok + end + end +end diff --git a/lib/boombox/mp4.ex b/lib/boombox/mp4.ex new file mode 100644 index 0000000..db5da1c --- /dev/null +++ b/lib/boombox/mp4.ex @@ -0,0 +1,70 @@ +defmodule Boombox.MP4 do + @moduledoc false + + import Membrane.ChildrenSpec + require Membrane.Pad, as: Pad + alias Boombox.Pipeline.{Ready, Wait} + + @spec create_input(String.t()) :: Wait.t() + def create_input(location) do + spec = + child(%Membrane.File.Source{location: location, seekable?: true}) + |> child(:mp4_demuxer, %Membrane.MP4.Demuxer.ISOM{optimize_for_non_fast_start?: true}) + + %Wait{actions: [spec: spec]} + end + + @spec handle_input_tracks(Membrane.MP4.Demuxer.ISOM.new_tracks_t()) :: Ready.t() + def handle_input_tracks(tracks) do + track_builders = + Map.new(tracks, fn + {id, %Membrane.AAC{}} -> + spec = + get_child(:mp4_demuxer) + |> via_out(Pad.ref(:output, id)) + |> child(:mp4_in_aac_parser, Membrane.AAC.Parser) + |> child(:mp4_in_aac_decoder, Membrane.AAC.FDK.Decoder) + + {:audio, spec} + + {id, %Membrane.H264{}} -> + spec = get_child(:mp4_demuxer) |> via_out(Pad.ref(:output, id)) + {:video, spec} + end) + + %Ready{track_builders: track_builders} + end + + @spec link_output( + String.t(), + Boombox.Pipeline.track_builders(), + Membrane.ChildrenSpec.t() + ) :: Ready.t() + def link_output(location, track_builders, spec_builder) do + spec = + [ + spec_builder, + child(:mp4_muxer, Membrane.MP4.Muxer.ISOM) + |> child(:mp4_file_sink, %Membrane.File.Sink{location: location}), + Enum.map(track_builders, fn + {:audio, builder} -> + builder + |> child(:mp4_out_aac_encoder, Membrane.AAC.FDK.Encoder) + |> child(:mp4_out_aac_parser, %Membrane.AAC.Parser{ + out_encapsulation: :none, + output_config: :esds + }) + |> via_in(Pad.ref(:input, :audio)) + |> get_child(:mp4_muxer) + + {:video, builder} -> + builder + |> child(:mp4_out_h264_parser, %Membrane.H264.Parser{output_stream_structure: :avc3}) + |> via_in(Pad.ref(:input, :video)) + |> get_child(:mp4_muxer) + end) + ] + + %Ready{actions: [spec: spec]} + end +end diff --git a/lib/boombox/pipeline.ex b/lib/boombox/pipeline.ex new file mode 100644 index 0000000..6b414aa --- /dev/null +++ b/lib/boombox/pipeline.ex @@ -0,0 +1,333 @@ +defmodule Boombox.Pipeline do + @moduledoc false + # The pipeline that spawns all the Boombox children. + # Support for each endpoint is handled via create_output, create_input + # and link_output functions. Each of them should return one of: + # - `t:Ready.t/0` - Returns the control to Boombox. + # - `t:Wait.t/0 - Waits for some endpoint-specific action to happen. + # When it does, `proceed_result` needs to be called with `Ready` + # or another `Wait`. + # + # The purpose of each function is the following: + # - create_output - Called at the beginning, initializes the output. + # Needed only when the output needs initialization before tracks + # are known. + # - create_input - Initializes the input. Called after the output finishes + # initialization. Should return `t:track_builders/0` in `Ready`. + # If there's any spec that needs to be returned with the track builders, + # it can be set in the `spec_builder` field of `Ready`. + # - link_output - Gets the track builders and spec builder. It should + # link the track builders to appropriate inputs and return the spec + # builder in the same spec. + + use Membrane.Pipeline + + require Membrane.Logger + + @type track_builders :: %{ + optional(:audio) => Membrane.ChildrenSpec.t(), + optional(:video) => Membrane.ChildrenSpec.t() + } + + defmodule Ready do + @moduledoc false + + @type t :: %__MODULE__{ + actions: [Membrane.Pipeline.Action.t()], + track_builders: Boombox.Pipeline.track_builders() | nil, + spec_builder: Membrane.ChildrenSpec.t() | nil, + eos_info: term + } + + defstruct actions: [], track_builders: nil, spec_builder: [], eos_info: nil + end + + defmodule Wait do + @moduledoc false + + @type t :: %__MODULE__{actions: [Membrane.Pipeline.Action.t()]} + defstruct actions: [] + end + + defmodule State do + @moduledoc false + + @enforce_keys [:status, :input, :output] + + defstruct @enforce_keys ++ + [ + actions_acc: [], + spec_builder: [], + track_builders: nil, + last_result: nil, + eos_info: nil + ] + + @typedoc """ + Statuses of the Boombox pipeline in the order of occurence. + + Statuses starting with `awaiting_` occur only if + the `proceed` function returns `t:Wait.t/0` when in the + preceeding status. + """ + @type status :: + :init + | :awaiting_output + | :output_ready + | :awaiting_input + | :input_ready + | :awaiting_output_link + | :output_linked + | :running + + @type t :: %__MODULE__{ + status: status(), + input: Boombox.input(), + output: Boombox.output(), + actions_acc: [Membrane.Pipeline.Action.t()], + spec_builder: Membrane.ChildrenSpec.t(), + track_builders: Boombox.Pipeline.track_builders() | nil, + last_result: Boombox.Pipeline.Ready.t() | Boombox.Pipeline.Wait.t() | nil, + eos_info: term() + } + end + + @impl true + def handle_init(ctx, opts) do + state = %State{ + input: opts |> Keyword.fetch!(:input) |> parse_input(), + output: opts |> Keyword.fetch!(:output) |> parse_output(), + status: :init + } + + proceed(ctx, state) + end + + @impl true + def handle_child_notification({:new_tracks, tracks}, :mp4_demuxer, ctx, state) do + Boombox.MP4.handle_input_tracks(tracks) + |> proceed_result(ctx, state) + end + + @impl true + def handle_child_notification({:new_tracks, tracks}, :webrtc_input, ctx, state) do + Boombox.WebRTC.handle_input_tracks(tracks) + |> proceed_result(ctx, state) + end + + @impl true + def handle_child_notification({:new_tracks, tracks}, :webrtc_output, ctx, state) do + unless state.status == :awaiting_output_link do + raise """ + Invalid status: #{inspect(state.status)}, expected :awaiting_output_link. \ + This is probably a bug in Boombox. + """ + end + + Boombox.WebRTC.handle_output_tracks_negotiated( + state.track_builders, + state.spec_builder, + tracks + ) + |> proceed_result(ctx, state) + end + + @impl true + def handle_child_notification({:end_of_stream, id}, :webrtc_output, _ctx, state) do + %{eos_info: track_ids} = state + track_ids = List.delete(track_ids, id) + state = %{state | eos_info: track_ids} + + if track_ids == [] do + wait_before_closing() + {[terminate: :normal], state} + else + {[], state} + end + end + + @impl true + def handle_child_notification(notification, child, _ctx, state) do + Membrane.Logger.debug_verbose( + "Ignoring notification #{inspect(notification)} from child #{inspect(child)}" + ) + + {[], state} + end + + @impl true + def handle_info({:rtmp_client_ref, client_ref}, ctx, state) do + Boombox.RTMP.handle_connection(client_ref) + |> proceed_result(ctx, state) + end + + @impl true + def handle_element_end_of_stream(:mp4_file_sink, :input, _ctx, state) do + {[terminate: :normal], state} + end + + @impl true + def handle_element_end_of_stream(_element, _pad, _ctx, state) do + {[], state} + end + + @spec proceed_result(Ready.t() | Wait.t(), Membrane.Pipeline.CallbackContext.t(), State.t()) :: + Membrane.Pipeline.callback_return() + defp proceed_result(result, ctx, %{status: :awaiting_input} = state) do + do_proceed(result, :input_ready, :awaiting_input, ctx, state) + end + + defp proceed_result(result, ctx, %{status: :awaiting_output_link} = state) do + do_proceed(result, :output_linked, :awaiting_output_link, ctx, state) + end + + defp proceed_result(result, ctx, %{status: :running} = state) do + do_proceed(result, nil, :running, ctx, state) + end + + defp proceed_result(_result, _ctx, state) do + raise """ + Boombox got into invalid internal state. Status: #{inspect(state.status)}. + """ + end + + @spec proceed(Membrane.Pipeline.CallbackContext.t(), State.t()) :: + Membrane.Pipeline.callback_return() + defp proceed(ctx, %{status: :init} = state) do + create_output(state.output, ctx) + |> do_proceed(:output_ready, :awaiting_output, ctx, state) + end + + defp proceed(ctx, %{status: :output_ready} = state) do + create_input(state.input, ctx) + |> do_proceed(:input_ready, :awaiting_input, ctx, state) + end + + defp proceed( + ctx, + %{ + status: :input_ready, + last_result: %Ready{track_builders: track_builders, spec_builder: spec_builder} + } = state + ) + when track_builders != nil do + state = %{state | track_builders: track_builders, spec_builder: spec_builder} + + link_output(state.output, track_builders, spec_builder, ctx) + |> do_proceed(:output_linked, :awaiting_output_link, ctx, state) + end + + defp proceed(ctx, %{status: :output_linked} = state) do + do_proceed(%Wait{}, nil, :running, ctx, %{state | eos_info: state.last_result.eos_info}) + end + + defp proceed(_ctx, state) do + raise """ + Boombox got into invalid internal state. Status: #{inspect(state.status)}. + """ + end + + @spec do_proceed( + Ready.t() | Wait.t(), + State.status() | nil, + State.status() | nil, + Membrane.Pipeline.CallbackContext.t(), + State.t() + ) :: Membrane.Pipeline.callback_return() + defp do_proceed(result, ready_status, wait_status, ctx, state) do + %{actions_acc: actions_acc} = state + + case result do + %Ready{actions: actions} = result when ready_status != nil -> + proceed(ctx, %{ + state + | status: ready_status, + last_result: result, + actions_acc: actions_acc ++ actions + }) + + %Wait{actions: actions} when wait_status != nil -> + {actions_acc ++ actions, %{state | actions_acc: [], status: wait_status}} + end + end + + @spec create_input(Boombox.input(), Membrane.Pipeline.CallbackContext.t()) :: + Ready.t() | Wait.t() + defp create_input({:webrtc, signaling}, _ctx) do + Boombox.WebRTC.create_input(signaling) + end + + defp create_input({:file, :mp4, location}, _ctx) do + Boombox.MP4.create_input(location) + end + + defp create_input({:rtmp, uri}, ctx) do + Boombox.RTMP.create_input(uri, ctx.utility_supervisor) + end + + @spec create_output(Boombox.output(), Membrane.Pipeline.CallbackContext.t()) :: + Ready.t() | Wait.t() + defp create_output({:webrtc, signaling}, _ctx) do + Boombox.WebRTC.create_output(signaling) + end + + defp create_output(_output, _ctx) do + %Ready{} + end + + @spec link_output( + Boombox.output(), + track_builders(), + Membrane.ChildrenSpec.t(), + Membrane.Pipeline.CallbackContext.t() + ) :: + Ready.t() | Wait.t() + defp link_output({:webrtc, _signaling}, track_builders, _spec_builder, _ctx) do + Boombox.WebRTC.link_output(track_builders) + end + + defp link_output({:file, :mp4, location}, track_builders, spec_builder, _ctx) do + Boombox.MP4.link_output(location, track_builders, spec_builder) + end + + defp parse_input(input) when is_binary(input) do + uri = URI.new!(input) + + cond do + uri.scheme == nil and Path.extname(uri.path) == ".mp4" -> + {:file, :mp4, uri.path} + + uri.scheme == "rtmp" -> + {:rtmp, input} + + true -> + raise "Couldn't parse URI: #{input}" + end + end + + defp parse_input(input) when is_tuple(input) do + input + end + + defp parse_output(output) when is_binary(output) do + uri = URI.new!(output) + + if uri.scheme == nil and Path.extname(uri.path) == ".mp4" do + {:file, :mp4, uri.path} + else + raise "Couldn't parse URI: #{output}" + end + end + + defp parse_output(output) when is_tuple(output) do + output + end + + # Wait between sending the last packet + # and terminating boombox, to avoid closing + # any connection before the other peer + # receives the last packet. + defp wait_before_closing() do + Process.sleep(500) + end +end diff --git a/lib/boombox/rtmp.ex b/lib/boombox/rtmp.ex new file mode 100644 index 0000000..c474a5e --- /dev/null +++ b/lib/boombox/rtmp.ex @@ -0,0 +1,56 @@ +defmodule Boombox.RTMP do + @moduledoc false + + import Membrane.ChildrenSpec + require Membrane.Logger + alias Boombox.Pipeline.{Ready, Wait} + alias Membrane.RTMP + + @spec create_input(String.t(), pid()) :: Wait.t() + def create_input(uri, utility_supervisor) do + {use_ssl?, port, target_app, target_stream_key} = RTMP.Utils.parse_url(uri) + + boombox = self() + + new_client_callback = fn client_ref, app, stream_key -> + if app == target_app and stream_key == target_stream_key do + send(boombox, {:rtmp_client_ref, client_ref}) + else + Membrane.Logger.warning("Unexpected client connected on /#{app}/#{stream_key}") + end + end + + server_options = %{ + handler: %Membrane.RTMP.Source.ClientHandler{controlling_process: self()}, + port: port, + use_ssl?: use_ssl?, + new_client_callback: new_client_callback, + client_timeout: 1_000 + } + + {:ok, _server} = + Membrane.UtilitySupervisor.start_link_child( + utility_supervisor, + {Membrane.RTMP.Server, server_options} + ) + + %Wait{} + end + + @spec handle_connection(pid()) :: Ready.t() + def handle_connection(client_ref) do + spec = [ + child(:rtmp_source, %Membrane.RTMP.SourceBin{client_ref: client_ref}) + |> via_out(:audio) + |> child(:rtmp_in_aac_parser, Membrane.AAC.Parser) + |> child(:rtmp_in_aac_decoder, Membrane.AAC.FDK.Decoder) + ] + + track_builders = %{ + audio: get_child(:rtmp_in_aac_decoder), + video: get_child(:rtmp_source) |> via_out(:video) + } + + %Ready{spec_builder: spec, track_builders: track_builders} + end +end diff --git a/lib/boombox/webrtc.ex b/lib/boombox/webrtc.ex new file mode 100644 index 0000000..9d03d9c --- /dev/null +++ b/lib/boombox/webrtc.ex @@ -0,0 +1,113 @@ +defmodule Boombox.WebRTC do + @moduledoc false + + import Membrane.ChildrenSpec + require Membrane.Pad, as: Pad + alias Boombox.Pipeline.{Ready, Wait} + + @spec create_input(Boombox.webrtc_opts()) :: Wait.t() + def create_input(signaling) do + signaling = resolve_signaling(signaling) + + spec = + child(:webrtc_input, %Membrane.WebRTC.Source{ + signaling: signaling, + video_codec: :h264 + }) + + %Wait{actions: [spec: spec]} + end + + @spec handle_input_tracks(Membrane.WebRTC.Source.new_tracks()) :: Ready.t() + def handle_input_tracks(tracks) do + track_builders = + Map.new(tracks, fn + %{kind: :audio, id: id} -> + spec = + get_child(:webrtc_input) + |> via_out(Pad.ref(:output, id)) + |> child(:webrtc_in_opus_decoder, Membrane.Opus.Decoder) + + {:audio, spec} + + %{kind: :video, id: id} -> + spec = + get_child(:webrtc_input) + |> via_out(Pad.ref(:output, id)) + + {:video, spec} + end) + + %Ready{track_builders: track_builders} + end + + @spec create_output(Boombox.webrtc_opts()) :: Ready.t() + def create_output(signaling) do + signaling = resolve_signaling(signaling) + + spec = + child(:webrtc_output, %Membrane.WebRTC.Sink{ + signaling: signaling, + tracks: [], + video_codec: :h264 + }) + + %Ready{actions: [spec: spec]} + end + + @spec link_output(Boombox.Pipeline.track_builders()) :: Wait.t() + def link_output(track_builders) do + tracks = Bunch.KVEnum.keys(track_builders) + %Wait{actions: [notify_child: {:webrtc_output, {:add_tracks, tracks}}]} + end + + @spec handle_output_tracks_negotiated( + Boombox.Pipeline.track_builders(), + Membrane.ChildrenSpec.t(), + Membrane.WebRTC.Sink.new_tracks() + ) :: Ready.t() + def handle_output_tracks_negotiated(track_builders, spec_builder, tracks) do + tracks = Map.new(tracks, &{&1.kind, &1.id}) + + spec = [ + spec_builder, + Enum.map(track_builders, fn + {:audio, builder} -> + builder + |> child(:webrtc_out_resampler, %Membrane.FFmpeg.SWResample.Converter{ + output_stream_format: %Membrane.RawAudio{ + sample_format: :s16le, + sample_rate: 48_000, + channels: 2 + } + }) + |> child(:webrtc_out_opus_encoder, Membrane.Opus.Encoder) + |> child(:webrtc_out_audio_realtimer, Membrane.Realtimer) + |> via_in(Pad.ref(:input, tracks.audio), options: [kind: :audio]) + |> get_child(:webrtc_output) + + {:video, builder} -> + builder + |> child(:webrtc_out_video_realtimer, Membrane.Realtimer) + |> child(:webrtc_out_h264_parser, %Membrane.H264.Parser{ + output_stream_structure: :annexb, + output_alignment: :nalu + }) + |> via_in(Pad.ref(:input, tracks.video), options: [kind: :video]) + |> get_child(:webrtc_output) + end) + ] + + %Ready{actions: [spec: spec], eos_info: Map.values(tracks)} + end + + defp resolve_signaling(%Membrane.WebRTC.SignalingChannel{} = signaling) do + signaling + end + + defp resolve_signaling(uri) when is_binary(uri) do + uri = URI.new!(uri) + {:ok, ip} = :inet.getaddr(~c"#{uri.host}", :inet) + {:websocket, ip: ip, port: uri.port} + end +end diff --git a/lib/membrane_template.ex b/lib/membrane_template.ex deleted file mode 100644 index c6882fb..0000000 --- a/lib/membrane_template.ex +++ /dev/null @@ -1,2 +0,0 @@ -defmodule Membrane.Template do -end diff --git a/mix.exs b/mix.exs index 3d9466b..aad5ecf 100644 --- a/mix.exs +++ b/mix.exs @@ -1,12 +1,12 @@ -defmodule Membrane.Template.Mixfile do +defmodule Boombox.Mixfile do use Mix.Project @version "0.1.0" - @github_url "https://github.com/membraneframework/membrane_template_plugin" + @github_url "https://github.com/membraneframework/boombox" def project do [ - app: :membrane_template_plugin, + app: :boombox, version: @version, elixir: "~> 1.13", elixirc_paths: elixirc_paths(Mix.env()), @@ -15,11 +15,11 @@ defmodule Membrane.Template.Mixfile do dialyzer: dialyzer(), # hex - description: "Template Plugin for Membrane Framework", + description: "Boombox", package: package(), # docs - name: "Membrane Template plugin", + name: "Boombox", source_url: @github_url, docs: docs() ] @@ -36,7 +36,17 @@ defmodule Membrane.Template.Mixfile do defp deps do [ - {:membrane_core, "~> 1.0"}, + {:membrane_core, "~> 1.1"}, + {:membrane_webrtc_plugin, "~> 0.21.0"}, + {:membrane_opus_plugin, "~> 0.20.0"}, + {:membrane_aac_plugin, "~> 0.18.0"}, + {:membrane_aac_fdk_plugin, "~> 0.18.0"}, + {:membrane_h26x_plugin, "~> 0.10.0"}, + {:membrane_h264_ffmpeg_plugin, "~> 0.32.0"}, + {:membrane_mp4_plugin, github: "membraneframework/membrane_mp4_plugin", branch: "wip-avc3"}, + {:membrane_realtimer_plugin, "~> 0.9.0"}, + {:membrane_rtmp_plugin, github: "membraneframework/membrane_rtmp_plugin"}, + {:membrane_ffmpeg_swresample_plugin, "~> 0.20.0"}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:dialyxir, ">= 0.0.0", only: :dev, runtime: false}, {:credo, ">= 0.0.0", only: :dev, runtime: false} @@ -73,7 +83,7 @@ defmodule Membrane.Template.Mixfile do extras: ["README.md", "LICENSE"], formatters: ["html"], source_ref: "v#{@version}", - nest_modules_by_prefix: [Membrane.Template] + nest_modules_by_prefix: [Boombox] ] end end diff --git a/mix.lock b/mix.lock index 8d15140..07dc522 100644 --- a/mix.lock +++ b/mix.lock @@ -1,21 +1,87 @@ %{ - "bunch": {:hex, :bunch, "1.6.0", "4775f8cdf5e801c06beed3913b0bd53fceec9d63380cdcccbda6be125a6cfd54", [:mix], [], "hexpm", "ef4e9abf83f0299d599daed3764d19e8eac5d27a5237e5e4d5e2c129cfeb9a22"}, - "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, + "bandit": {:hex, :bandit, "1.5.7", "6856b1e1df4f2b0cb3df1377eab7891bec2da6a7fd69dc78594ad3e152363a50", [:mix], [{:hpax, "~> 1.0.0", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "f2dd92ae87d2cbea2fa9aa1652db157b6cba6c405cb44d4f6dd87abba41371cd"}, + "bimap": {:hex, :bimap, "1.3.0", "3ea4832e58dc83a9b5b407c6731e7bae87458aa618e6d11d8e12114a17afa4b3", [:mix], [], "hexpm", "bf5a2b078528465aa705f405a5c638becd63e41d280ada41e0f77e6d255a10b4"}, + "bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"}, + "bunch_native": {:hex, :bunch_native, "0.5.0", "8ac1536789a597599c10b652e0b526d8833348c19e4739a0759a2bedfd924e63", [:mix], [{:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "24190c760e32b23b36edeb2dc4852515c7c5b3b8675b1a864e0715bdd1c8f80d"}, + "bundlex": {:hex, :bundlex, "1.5.3", "35d01e5bc0679510dd9a327936ffb518f63f47175c26a35e708cc29eaec0890b", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:req, ">= 0.4.0", [hex: :req, repo: "hexpm", optional: false]}, {:zarex, "~> 1.0", [hex: :zarex, repo: "hexpm", optional: false]}], "hexpm", "debd0eac151b404f6216fc60222761dff049bf26f7d24d066c365317650cd118"}, + "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, + "castore": {:hex, :castore, "1.0.8", "dedcf20ea746694647f883590b82d9e96014057aff1d44d03ec90f36a5c0dc6e", [:mix], [], "hexpm", "0b2b66d2ee742cb1d9cb8c8be3b43c3a70ee8651f37b75a8b982e036752983f1"}, "coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"}, - "credo": {:hex, :credo, "1.7.0", "6119bee47272e85995598ee04f2ebbed3e947678dee048d10b5feca139435f75", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "6839fcf63d1f0d1c0f450abc8564a57c43d644077ab96f2934563e68b8a769d7"}, - "dialyxir": {:hex, :dialyxir, "1.4.1", "a22ed1e7bd3a3e3f197b68d806ef66acb61ee8f57b3ac85fc5d57354c5482a93", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "84b795d6d7796297cca5a3118444b80c7d94f7ce247d49886e7c291e1ae49801"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.35", "437773ca9384edf69830e26e9e7b2e0d22d2596c4a6b17094a3b29f01ea65bb8", [:mix], [], "hexpm", "8652ba3cb85608d0d7aa2d21b45c6fad4ddc9a1f9a1f1b30ca3a246f0acc33f6"}, - "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, - "ex_doc": {:hex, :ex_doc, "0.30.6", "5f8b54854b240a2b55c9734c4b1d0dd7bdd41f71a095d42a70445c03cf05a281", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bd48f2ddacf4e482c727f9293d9498e0881597eae6ddc3d9562bd7923375109f"}, - "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, - "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, - "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, - "makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"}, - "membrane_core": {:hex, :membrane_core, "1.0.0", "1b543aefd952283be1f2a215a1db213aa4d91222722ba03cd35280622f1905ee", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "352c90fd0a29942143c4bf7a727cc05c632e323f50a1a4e99321b1e8982f1533"}, - "nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"}, + "crc": {:hex, :crc, "0.10.5", "ee12a7c056ac498ef2ea985ecdc9fa53c1bfb4e53a484d9f17ff94803707dfd8", [:mix, :rebar3], [{:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3e673b6495a9525c5c641585af1accba59a1eb33de697bedf341e247012c2c7f"}, + "credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"}, + "dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"}, + "elixir_make": {:hex, :elixir_make, "0.8.4", "4960a03ce79081dee8fe119d80ad372c4e7badb84c493cc75983f9d3bc8bde0f", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.0", [hex: :certifi, repo: "hexpm", optional: true]}], "hexpm", "6e7f1d619b5f61dfabd0a20aa268e575572b542ac31723293a4c1a567d5ef040"}, + "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, + "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, + "ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"}, + "ex_dtls": {:hex, :ex_dtls, "0.15.2", "6c8c0f8eb67525216551bd3e0322ab33c9d851d56ef3e065efab4fd277a8fbb9", [:mix], [{:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "6b852bc926bbdc9c1b9c4ecc6cfc73a89d4e106042802cefea2c1503072a9f2a"}, + "ex_ice": {:hex, :ex_ice, "0.7.1", "3ad14f7281ece304dfee227e332b8a67d93d5857602a8a4300a826c250af136e", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "78e6bc4abb5294dcf0a474d0a91e78a829916291d846a0e255867dc5db8733e7"}, + "ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"}, + "ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"}, + "ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"}, + "ex_sdp": {:hex, :ex_sdp, "0.17.0", "4c50e7814f01f149c0ccf258fba8428f8567dffecf1c416ec3f6aaaac607a161", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "c7fe0625902be2a835b5fe6834a189f7db7639d2625c8e9d8b3564e6d704145f"}, + "ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"}, + "ex_turn": {:hex, :ex_turn, "0.1.0", "177405aadf3d754567d0d37cf881a83f9cacf8f45314d188633b04c4a9e7c1ec", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "d677737fb7d45274d5dac19fe3c26b9038b6effbc0a6b3e7417bccc76b6d1cd3"}, + "ex_webrtc": {:hex, :ex_webrtc, "0.3.0", "283f5b31d539f65238596793aabcefe32d221618ceb751ae68951712a486cac2", [:mix], [{:crc, "~> 0.10", [hex: :crc, repo: "hexpm", optional: false]}, {:ex_dtls, "~> 0.15.0", [hex: :ex_dtls, repo: "hexpm", optional: false]}, {:ex_ice, "~> 0.7.0", [hex: :ex_ice, repo: "hexpm", optional: false]}, {:ex_libsrtp, "~> 0.7.1", [hex: :ex_libsrtp, repo: "hexpm", optional: false]}, {:ex_rtcp, "~> 0.4.0", [hex: :ex_rtcp, repo: "hexpm", optional: false]}, {:ex_rtp, "~> 0.4.0", [hex: :ex_rtp, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 0.17.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}], "hexpm", "a8a4f38cdcacae170615d6abb83d8c42220b6ac0133d84b900f4994d5eff7143"}, + "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, + "finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"}, + "heap": {:hex, :heap, "2.0.2", "d98cb178286cfeb5edbcf17785e2d20af73ca57b5a2cf4af584118afbcf917eb", [:mix], [], "hexpm", "ba9ea2fe99eb4bcbd9a8a28eaf71cbcac449ca1d8e71731596aace9028c9d429"}, + "hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, + "logger_backends": {:hex, :logger_backends, "1.0.0", "09c4fad6202e08cb0fbd37f328282f16539aca380f512523ce9472b28edc6bdf", [:mix], [], "hexpm", "1faceb3e7ec3ef66a8f5746c5afd020e63996df6fd4eb8cdb789e5665ae6c9ce"}, + "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, + "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, + "membrane_aac_fdk_plugin": {:hex, :membrane_aac_fdk_plugin, "0.18.8", "88d47923805cbd9a977fc7e5d3eb8d3028a2e358ad9ad7b124684adc78c2e8ee", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "bb9e706d0949954affd4e295f5d3d4660096997756b5422119800d961c46cc63"}, + "membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"}, + "membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.18.1", "30433bffd4d5d773f79448dd9afd55d77338721688f09a89b20d742a68cc2c3d", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "8fd048c47d5d2949eb557e19f43f62d534d3af5096187f1a1a3a1694d14b772c"}, + "membrane_cmaf_format": {:hex, :membrane_cmaf_format, "0.7.1", "9ea858faefdcb181cdfa8001be827c35c5f854e9809ad57d7062cff1f0f703fd", [:mix], [], "hexpm", "3c7b4ed2a986e27f6f336d2f19e9442cb31d93b3142fc024c019572faca54a73"}, + "membrane_common_c": {:hex, :membrane_common_c, "0.16.0", "caf3f29d2f5a1d32d8c2c122866110775866db2726e4272be58e66dfdf4bce40", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "a3c7e91de1ce1f8b23b9823188a5d13654d317235ea0ca781c05353ed3be9b1c"}, + "membrane_core": {:hex, :membrane_core, "1.1.1", "4dcff6e9f3b2ecd4f437c20e201e53957731772c0f15b3005062c41f7f58f500", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3802f3fc071505c59d48792487d9927e803d4edb4039710ffa52cdb60bb0aecc"}, + "membrane_ffmpeg_swresample_plugin": {:hex, :membrane_ffmpeg_swresample_plugin, "0.20.2", "2e669f0b25418d10b51a73bc52d2e12e4a3a26b416c5c1199d852c3f781a18b3", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.2", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:mockery, "~> 2.1", [hex: :mockery, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "6c8d3bcd61d568dd94cabb9b45f29e8926e0076e4432d8f419378e004e02147c"}, + "membrane_file_plugin": {:hex, :membrane_file_plugin, "0.17.2", "650e134c2345d946f930082fac8bac9f5aba785a7817d38a9a9da41ffc56fa92", [:mix], [{:logger_backends, "~> 1.0", [hex: :logger_backends, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "df50c6040004cd7b901cf057bd7e99c875bbbd6ae574efc93b2c753c96f43b9d"}, + "membrane_flv_plugin": {:hex, :membrane_flv_plugin, "0.12.0", "d715ad405af86dcaf4b2f479e34088e1f6738c7280366828e1066b39d2aa493a", [:mix], [{:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}], "hexpm", "a317872d6d394e550c7bfd8979f12a3a1cc1e89b547d75360321025b403d3279"}, + "membrane_funnel_plugin": {:hex, :membrane_funnel_plugin, "0.9.0", "9cfe09e44d65751f7d9d8d3c42e14797f7be69e793ac112ea63cd224af70a7bf", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "988790aca59d453a6115109f050699f7f45a2eb6a7f8dc5c96392760cddead54"}, + "membrane_h264_ffmpeg_plugin": {:hex, :membrane_h264_ffmpeg_plugin, "0.32.1", "1e9eb5647dd5fcfc4a35b69b6c5bdaad27e51d3f8fc0a7fe17cb02b710353cbf", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.3.0", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "e28aafb236587c6e093d610f3e4ee5fd6801250152f4743037b096b49e6e9a53"}, + "membrane_h264_format": {:hex, :membrane_h264_format, "0.6.1", "44836cd9de0abe989b146df1e114507787efc0cf0da2368f17a10c47b4e0738c", [:mix], [], "hexpm", "4b79be56465a876d2eac2c3af99e115374bbdc03eb1dea4f696ee9a8033cd4b0"}, + "membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"}, + "membrane_h26x_plugin": {:hex, :membrane_h26x_plugin, "0.10.2", "caf2790d8c107df35f8d456b45f4e09fb9c56ce6c7669a3a03f7d59972e6ed82", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}], "hexpm", "becf1ac4a589adecd850137ccd61a33058f686083a514a7e39fcd721bcf9fb2e"}, + "membrane_mp4_format": {:hex, :membrane_mp4_format, "0.8.0", "8c6e7d68829228117d333b4fbb030e7be829aab49dd8cb047fdc664db1812e6a", [:mix], [], "hexpm", "148dea678a1f82ccfd44dbde6f936d2f21255f496cb45a22cc6eec427f025522"}, + "membrane_mp4_plugin": {:git, "https://github.com/membraneframework/membrane_mp4_plugin.git", "3786a3e834cc2ce541c3c7830590b39da87e12a1", [branch: "wip-avc3"]}, + "membrane_opus_format": {:hex, :membrane_opus_format, "0.3.0", "3804d9916058b7cfa2baa0131a644d8186198d64f52d592ae09e0942513cb4c2", [:mix], [], "hexpm", "8fc89c97be50de23ded15f2050fe603dcce732566fe6fdd15a2de01cb6b81afe"}, + "membrane_opus_plugin": {:hex, :membrane_opus_plugin, "0.20.2", "f259a46f87c1749cd8dc8cbaf4b00df1085b39a8f0deef219a93ccda8cb54006", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.2", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "4540615a49b3e3f008ba30532893ff1efb9d79b974e0b947efb92d49fc9cccbd"}, + "membrane_precompiled_dependency_provider": {:hex, :membrane_precompiled_dependency_provider, "0.1.2", "8af73b7dc15ba55c9f5fbfc0453d4a8edfb007ade54b56c37d626be0d1189aba", [:mix], [{:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "7fe3e07361510445a29bee95336adde667c4162b76b7f4c8af3aeb3415292023"}, + "membrane_raw_audio_format": {:hex, :membrane_raw_audio_format, "0.12.0", "b574cd90f69ce2a8b6201b0ccf0826ca28b0fbc8245b8078d9f11cef65f7d5d5", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "6e6c98e3622a2b9df19eab50ba65d7eb45949b1ba306fa8423df6cdb12fd0b44"}, + "membrane_raw_video_format": {:hex, :membrane_raw_video_format, "0.3.0", "ba10f475e0814a6fe79602a74536b796047577c7ef5b0e33def27cd344229699", [:mix], [], "hexpm", "2f08760061c8a5386ecf04273480f10e48d25a1a40aa99476302b0bcd34ccb1c"}, + "membrane_realtimer_plugin": {:hex, :membrane_realtimer_plugin, "0.9.0", "27210d5e32a5e8bfd101c41e4d8c1876e873a52cc129ebfbee4d0ccbea1cbd21", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "b2e96d62135ee57ef9a5fdea94b3a9ab1198e5ea8ee248391b89c671125d1b51"}, + "membrane_rtmp_plugin": {:git, "https://github.com/membraneframework/membrane_rtmp_plugin.git", "dc6a2959a00666c1f0b9dab9c30f69b8ca0fa55d", []}, + "membrane_rtp_format": {:hex, :membrane_rtp_format, "0.8.0", "828924bbd27efcf85b2015ae781e824c4a9928f0a7dc132abc66817b2c6edfc4", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "bc75d2a649dfaef6df563212fbb9f9f62eebc871393692f9dae8d289bd4f94bb"}, + "membrane_rtp_h264_plugin": {:hex, :membrane_rtp_h264_plugin, "0.19.2", "de3eeaf35052f9f709d469fa7630d9ecc8f5787019f7072516eae1fd881bc792", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}], "hexpm", "d298e9cd471ab3601366c48ca0fec84135966707500152bbfcf3f968700647ae"}, + "membrane_rtp_opus_plugin": {:hex, :membrane_rtp_opus_plugin, "0.9.0", "ae76421faa04697a4af76a55b6c5e675dea61b611d29d8201098783d42863af7", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}], "hexpm", "58f095d2978daf999d87c1c016007cb7d99434208486331ab5045e77f5be9dcc"}, + "membrane_rtp_plugin": {:hex, :membrane_rtp_plugin, "0.29.0", "0277310eb599b8e6de9e0b864807f23b3b245865e39a28f0cbab695d1f2c157e", [:mix], [{:bimap, "~> 1.2", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_libsrtp, "~> 0.6.0 or ~> 0.7.0", [hex: :ex_libsrtp, repo: "hexpm", optional: true]}, {:heap, "~> 2.0.2", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_funnel_plugin, "~> 0.9.0", [hex: :membrane_funnel_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_telemetry_metrics, "~> 0.1.0", [hex: :membrane_telemetry_metrics, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}], "hexpm", "1b3fd808114e06332b6a4e000238998a9188d1ef625c414ca3239aee70f0775d"}, + "membrane_rtp_vp8_plugin": {:hex, :membrane_rtp_vp8_plugin, "0.9.1", "9e8a74d764730a23382ba862a238963c9639b4c6963238caeb6fe2449a66add8", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_vp8_format, "~> 0.4.0", [hex: :membrane_vp8_format, repo: "hexpm", optional: false]}], "hexpm", "704856eb2734bb6ea5cc47242c241de45debb5724a81cffb344bacda9867fe98"}, + "membrane_telemetry_metrics": {:hex, :membrane_telemetry_metrics, "0.1.0", "cb93d28356b436b0597736c3e4153738d82d2a14ff547f831df7e9051e54fc06", [:mix], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6.1", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "aba28dc8311f70ced95d984509be930fac55857d2d18bffcf768815e627be3f0"}, + "membrane_vp8_format": {:hex, :membrane_vp8_format, "0.4.0", "6c29ec67479edfbab27b11266dc92f18f3baf4421262c5c31af348c33e5b92c7", [:mix], [], "hexpm", "8bb005ede61db8fcb3535a883f32168b251c2dfd1109197c8c3b39ce28ed08e2"}, + "membrane_webrtc_plugin": {:hex, :membrane_webrtc_plugin, "0.21.0", "0d47a6ffe3eb18abf43e9f6d089a409120ecd5cff43095d065fbb9e1c038f79c", [:mix], [{:bandit, "~> 1.2", [hex: :bandit, repo: "hexpm", optional: false]}, {:ex_webrtc, "~> 0.3.0", [hex: :ex_webrtc, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_rtp_h264_plugin, "~> 0.19.0", [hex: :membrane_rtp_h264_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_opus_plugin, "~> 0.9.0", [hex: :membrane_rtp_opus_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_plugin, "~> 0.29.0", [hex: :membrane_rtp_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_vp8_plugin, "~> 0.9.1", [hex: :membrane_rtp_vp8_plugin, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.0", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "39d383eadb1b1ce10975ac8505012e901c8961e6f5a65577ff0fbf03b7bc8fc7"}, + "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"}, + "mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"}, + "mockery": {:hex, :mockery, "2.3.3", "3dba87bd0422a513e6af6e0d811383f38f82ac6be5d3d285a5fcca9c299bd0ac", [:mix], [], "hexpm", "17282be00613286254298117cd25e607a39f15ac03b41c631f60e52f5b5ec974"}, + "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, + "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"}, + "plug": {:hex, :plug, "1.16.1", "40c74619c12f82736d2214557dedec2e9762029b2438d6d175c5074c933edc9d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a13ff6b9006b03d7e33874945b2755253841b238c34071ed85b0e86057f8cddc"}, + "plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"}, "qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"}, - "ratio": {:hex, :ratio, "3.0.2", "60a5976872a4dc3d873ecc57eed1738589e99d1094834b9c935b118231297cfb", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "3a13ed5a30ad0bfd7e4a86bf86d93d2b5a06f5904417d38d3f3ea6406cdfc7bb"}, + "ratio": {:hex, :ratio, "4.0.1", "3044166f2fc6890aa53d3aef0c336f84b2bebb889dc57d5f95cc540daa1912f8", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "c60cbb3ccdff9ffa56e7d6d1654b5c70d9f90f4d753ab3a43a6bf40855b881ce"}, + "req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"}, + "shmex": {:hex, :shmex, "0.5.1", "81dd209093416bf6608e66882cb7e676089307448a1afd4fc906c1f7e5b94cf4", [:mix], [{:bunch_native, "~> 0.5.0", [hex: :bunch_native, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "c29f8286891252f64c4e1dac40b217d960f7d58def597c4e606ff8fbe71ceb80"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "telemetry_metrics": {:hex, :telemetry_metrics, "0.6.2", "2caabe9344ec17eafe5403304771c3539f3b6e2f7fb6a6f602558c825d0d0bfb", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9b43db0dc33863930b9ef9d27137e78974756f5f198cae18409970ed6fa5b561"}, + "thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"}, + "unifex": {:hex, :unifex, "1.2.0", "90d1ec5e6d788350e07e474f7bd8b0ee866d6606beb9ca4e20dbb26328712a84", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}], "hexpm", "7a8395aabc3ba6cff04bbe5b995de7f899a38eb57f189e49927d6b8b6ccb6883"}, + "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, + "websock_adapter": {:hex, :websock_adapter, "0.5.6", "0437fe56e093fd4ac422de33bf8fc89f7bc1416a3f2d732d8b2c8fd54792fe60", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "e04378d26b0af627817ae84c92083b7e97aca3121196679b73c73b99d0d133ea"}, + "zarex": {:hex, :zarex, "1.0.5", "58239e3ee5d75f343262bb4df5cf466555a1c689f920e5d3651a9333972f7c7e", [:mix], [], "hexpm", "9fb72ef0567c2b2742f5119a1ba8a24a2fabb21b8d09820aefbf3e592fa9a46a"}, } diff --git a/test/boombox_test.exs b/test/boombox_test.exs new file mode 100644 index 0000000..9d99aa7 --- /dev/null +++ b/test/boombox_test.exs @@ -0,0 +1,163 @@ +defmodule BoomboxTest do + use ExUnit.Case, async: true + + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + import Support.Async + + require Membrane.Pad, as: Pad + require Logger + + alias Membrane.Testing + alias Support.Compare + + @bbb_mp4 "test/fixtures/bun10s.mp4" + @bbb_mp4_a "test/fixtures/bun10s_a.mp4" + @bbb_mp4_v "test/fixtures/bun10s_v.mp4" + + @moduletag :tmp_dir + + @tag :mp4 + async_test "mp4 -> mp4", %{tmp_dir: tmp} do + output = Path.join(tmp, "output.mp4") + Boombox.run(input: @bbb_mp4, output: output) + Compare.compare(output, "test/fixtures/ref_bun10s_aac.mp4") + end + + @tag :mp4_audio + async_test "mp4 -> mp4 audio", %{tmp_dir: tmp} do + Boombox.run(input: @bbb_mp4_a, output: "#{tmp}/output.mp4") + Compare.compare("#{tmp}/output.mp4", "test/fixtures/ref_bun10s_aac.mp4", :audio) + end + + @tag :mp4_video + async_test "mp4 -> mp4 video", %{tmp_dir: tmp} do + Boombox.run(input: @bbb_mp4_v, output: "#{tmp}/output.mp4") + Compare.compare("#{tmp}/output.mp4", "test/fixtures/ref_bun10s_aac.mp4", :video) + end + + @tag :webrtc + async_test "mp4 -> webrtc -> mp4", %{tmp_dir: tmp} do + signaling = Membrane.WebRTC.SignalingChannel.new() + t = Task.async(fn -> Boombox.run(input: @bbb_mp4, output: {:webrtc, signaling}) end) + Boombox.run(input: {:webrtc, signaling}, output: "#{tmp}/output.mp4") + Task.await(t) + Compare.compare("#{tmp}/output.mp4", "test/fixtures/ref_bun10s_opus_aac.mp4") + end + + @tag :webrtc_audio + async_test "mp4 -> webrtc -> mp4 audio", %{tmp_dir: tmp} do + signaling = Membrane.WebRTC.SignalingChannel.new() + + t = + Task.async(fn -> Boombox.run(input: @bbb_mp4_a, output: {:webrtc, signaling}) end) + + Boombox.run(input: {:webrtc, signaling}, output: "#{tmp}/output.mp4") + Task.await(t) + Compare.compare("#{tmp}/output.mp4", "test/fixtures/ref_bun10s_opus_aac.mp4", :audio) + end + + @tag :webrtc_video + async_test "mp4 -> webrtc -> mp4 video", %{tmp_dir: tmp} do + signaling = Membrane.WebRTC.SignalingChannel.new() + + t = + Task.async(fn -> Boombox.run(input: @bbb_mp4_v, output: {:webrtc, signaling}) end) + + Boombox.run(input: {:webrtc, signaling}, output: "#{tmp}/output.mp4") + Task.await(t) + Compare.compare("#{tmp}/output.mp4", "test/fixtures/ref_bun10s_opus_aac.mp4", :video) + end + + @tag :webrtc2 + async_test "mp4 -> webrtc -> webrtc -> mp4", %{tmp_dir: tmp} do + signaling1 = Membrane.WebRTC.SignalingChannel.new() + signaling2 = Membrane.WebRTC.SignalingChannel.new() + + t1 = + Task.async(fn -> Boombox.run(input: @bbb_mp4, output: {:webrtc, signaling1}) end) + + t2 = + Task.async(fn -> + Boombox.run(input: {:webrtc, signaling1}, output: {:webrtc, signaling2}) + end) + + Boombox.run(input: {:webrtc, signaling2}, output: "#{tmp}/output.mp4") + Task.await(t1) + Task.await(t2) + Compare.compare("#{tmp}/output.mp4", "test/fixtures/ref_bun10s_opus2_aac.mp4") + end + + @tag :rtmp + async_test "rtmp -> mp4", %{tmp_dir: tmp} do + url = "rtmp://localhost:5000/app/stream_key" + t = Task.async(fn -> Boombox.run(input: url, output: "#{tmp}/output.mp4") end) + + # Wait for boombox to be ready + Process.sleep(200) + p = send_rtmp(url) + Task.await(t, 30_000) + Testing.Pipeline.terminate(p) + Compare.compare("#{tmp}/output.mp4", "test/fixtures/ref_bun10s_aac.mp4") + end + + @tag :rtmp_webrtc + async_test "rtmp -> webrtc -> mp4", %{tmp_dir: tmp} do + url = "rtmp://localhost:5002/app/stream_key" + + signaling = Membrane.WebRTC.SignalingChannel.new() + + t1 = + Task.async(fn -> Boombox.run(input: url, output: {:webrtc, signaling}) end) + + t2 = + Task.async(fn -> Boombox.run(input: {:webrtc, signaling}, output: "#{tmp}/output.mp4") end) + + # Wait for boombox to be ready + Process.sleep(200) + p = send_rtmp(url) + Task.await(t1, 30_000) + Task.await(t2) + Testing.Pipeline.terminate(p) + Compare.compare("#{tmp}/output.mp4", "test/fixtures/ref_bun10s_opus_aac.mp4") + end + + defp send_rtmp(url) do + p = + Testing.Pipeline.start_link_supervised!( + spec: + child(%Membrane.File.Source{location: @bbb_mp4, seekable?: true}) + |> child(:demuxer, %Membrane.MP4.Demuxer.ISOM{optimize_for_non_fast_start?: true}) + ) + + assert_pipeline_notified(p, :demuxer, {:new_tracks, tracks}) + + [{audio_id, %Membrane.AAC{}}, {video_id, %Membrane.H264{}}] = + Enum.sort_by(tracks, fn {_id, %format{}} -> format end) + + Testing.Pipeline.execute_actions(p, + spec: [ + get_child(:demuxer) + |> via_out(Pad.ref(:output, video_id)) + |> child(Membrane.Realtimer) + |> child(:video_parser, %Membrane.H264.Parser{ + output_stream_structure: :avc1 + }) + |> via_in(Pad.ref(:video, 0)) + |> get_child(:rtmp_sink), + get_child(:demuxer) + |> via_out(Pad.ref(:output, audio_id)) + |> child(Membrane.Realtimer) + |> child(:audio_parser, %Membrane.AAC.Parser{ + out_encapsulation: :none, + output_config: :esds + }) + |> via_in(Pad.ref(:audio, 0)) + |> get_child(:rtmp_sink), + child(:rtmp_sink, %Membrane.RTMP.Sink{rtmp_url: url}) + ] + ) + + p + end +end diff --git a/test/fixtures/bun10s.mp4 b/test/fixtures/bun10s.mp4 new file mode 100644 index 0000000..b7b2741 Binary files /dev/null and b/test/fixtures/bun10s.mp4 differ diff --git a/test/fixtures/bun10s_a.mp4 b/test/fixtures/bun10s_a.mp4 new file mode 100644 index 0000000..2fe275e Binary files /dev/null and b/test/fixtures/bun10s_a.mp4 differ diff --git a/test/fixtures/bun10s_v.mp4 b/test/fixtures/bun10s_v.mp4 new file mode 100644 index 0000000..f73ece8 Binary files /dev/null and b/test/fixtures/bun10s_v.mp4 differ diff --git a/test/fixtures/ref_bun10s_aac.mp4 b/test/fixtures/ref_bun10s_aac.mp4 new file mode 100644 index 0000000..cf5df7d Binary files /dev/null and b/test/fixtures/ref_bun10s_aac.mp4 differ diff --git a/test/fixtures/ref_bun10s_opus2_aac.mp4 b/test/fixtures/ref_bun10s_opus2_aac.mp4 new file mode 100644 index 0000000..40568ba Binary files /dev/null and b/test/fixtures/ref_bun10s_opus2_aac.mp4 differ diff --git a/test/fixtures/ref_bun10s_opus_aac.mp4 b/test/fixtures/ref_bun10s_opus_aac.mp4 new file mode 100644 index 0000000..7963474 Binary files /dev/null and b/test/fixtures/ref_bun10s_opus_aac.mp4 differ diff --git a/test/support/async.ex b/test/support/async.ex new file mode 100644 index 0000000..79f4975 --- /dev/null +++ b/test/support/async.ex @@ -0,0 +1,57 @@ +defmodule Support.Async do + @moduledoc false + # Helper for creating asynchronous tests + # - creates a public function instead of a test + # - creates a module with a single test that calls said function + # - copies all @tags to the newly created module + # - setup and setup_all won't work (yet) + + defmacro async_test( + test_name, + context \\ quote do + %{} + end, + do: block + ) do + id = :crypto.strong_rand_bytes(12) |> Base.encode16() + test_module_name = Module.concat(__CALLER__.module, "AsyncTest_#{id}") + fun_name = :"async_test_#{id}" + after_compile_fun_name = :"async_test_ac_#{id}" + + quote do + @tags_attrs [:tag, :describetag, :moduletag] + |> Enum.flat_map(fn attr -> + Module.get_attribute(__MODULE__, attr) + |> Enum.map(&{attr, &1}) + end) + + def unquote(fun_name)(unquote(context)) do + unquote(block) + end + + def unquote(after_compile_fun_name)(_bytecode, _env) do + test_name = unquote(test_name) + fun_name = unquote(fun_name) + + content = + quote do + use ExUnit.Case, async: true + + Enum.each(unquote(@tags_attrs), fn {name, value} -> + Module.put_attribute(__MODULE__, name, value) + end) + + test unquote(test_name), context do + unquote(__MODULE__).unquote(fun_name)(context) + end + end + + Module.create(unquote(test_module_name), content, __ENV__) + end + + @after_compile {__MODULE__, unquote(after_compile_fun_name)} + + Module.delete_attribute(__MODULE__, :tag) + end + end +end diff --git a/test/support/compare.ex b/test/support/compare.ex new file mode 100644 index 0000000..9879570 --- /dev/null +++ b/test/support/compare.ex @@ -0,0 +1,130 @@ +defmodule Support.Compare do + @moduledoc false + + import ExUnit.Assertions + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + + require Membrane.Pad, as: Pad + + alias Membrane.Testing + + defmodule GetBuffers do + @moduledoc false + use Membrane.Sink + + def_input_pad :input, accepted_format: _any + + @impl true + def handle_init(_ctx, _opts) do + {[], %{acc: []}} + end + + @impl true + def handle_buffer(:input, buffer, _ctx, state) do + {[], %{acc: [buffer | state.acc]}} + end + + @impl true + def handle_end_of_stream(:input, _ctx, state) do + {[notify_parent: {:buffers, Enum.reverse(state.acc)}], state} + end + end + + @spec compare(Path.t(), Path.t(), [:audio | :video]) :: :ok + def compare(subject, reference, kinds \\ [:audio, :video]) do + kinds = Bunch.listify(kinds) + p = Testing.Pipeline.start_link_supervised!() + + Testing.Pipeline.execute_actions(p, + spec: [ + child(%Membrane.File.Source{location: subject, seekable?: true}) + |> child(:sub_demuxer, %Membrane.MP4.Demuxer.ISOM{optimize_for_non_fast_start?: true}), + child(%Membrane.File.Source{location: reference, seekable?: true}) + |> child(:ref_demuxer, %Membrane.MP4.Demuxer.ISOM{optimize_for_non_fast_start?: true}) + ] + ) + + assert_pipeline_notified(p, :ref_demuxer, {:new_tracks, tracks}) + + [{audio_id, %Membrane.AAC{}}, {video_id, %Membrane.H264{}}] = + Enum.sort_by(tracks, fn {_id, %format{}} -> format end) + + ref_spec = + [ + get_child(:ref_demuxer) + |> via_out(Pad.ref(:output, video_id)) + |> child(:ref_video_bufs, GetBuffers), + get_child(:ref_demuxer) + |> via_out(Pad.ref(:output, audio_id)) + |> child(:ref_aac, Membrane.AAC.Parser) + |> child(Membrane.AAC.FDK.Decoder) + |> child(:ref_audio_bufs, GetBuffers) + ] + + assert_pipeline_notified(p, :sub_demuxer, {:new_tracks, tracks}) + + assert length(tracks) == length(kinds) + + sub_spec = + Enum.map(tracks, fn + {id, %Membrane.AAC{}} -> + assert :audio in kinds + + get_child(:sub_demuxer) + |> via_out(Pad.ref(:output, id)) + |> child(Membrane.AAC.Parser) + |> child(Membrane.AAC.FDK.Decoder) + |> child(:sub_audio_bufs, GetBuffers) + + {id, %Membrane.H264{}} -> + assert :video in kinds + + get_child(:sub_demuxer) + |> via_out(Pad.ref(:output, id)) + |> child(:sub_video_bufs, GetBuffers) + end) + + Testing.Pipeline.execute_actions(p, spec: [ref_spec, sub_spec]) + + if :video in kinds do + assert_pipeline_notified(p, :sub_video_bufs, {:buffers, sub_video_bufs}) + assert_pipeline_notified(p, :ref_video_bufs, {:buffers, ref_video_bufs}) + + assert length(ref_video_bufs) == length(sub_video_bufs) + + Enum.zip(sub_video_bufs, ref_video_bufs) + |> Enum.each(fn {sub, ref} -> + assert sub.payload == ref.payload + end) + end + + if :audio in kinds do + assert_pipeline_notified(p, :sub_audio_bufs, {:buffers, sub_audio_bufs}) + assert_pipeline_notified(p, :ref_audio_bufs, {:buffers, ref_audio_bufs}) + + assert length(ref_audio_bufs) == length(sub_audio_bufs) + + Enum.zip(sub_audio_bufs, ref_audio_bufs) + |> Enum.each(fn {sub, ref} -> + # The results differ between operating systems + # and subsequent runs due to transcoding. + # The threshold here is obtained empirically and may need + # to be adjusted, or a better metric should be used. + assert samples_min_square_error(sub.payload, ref.payload) < 30_000 + end) + end + + Testing.Pipeline.terminate(p) + end + + defp samples_min_square_error(bin1, bin2) do + assert byte_size(bin1) == byte_size(bin2) + + Enum.zip(for(<>, do: b), for(<>, do: b)) + |> Enum.map(fn {b1, b2} -> + (b1 - b2) ** 2 + end) + |> then(&:math.sqrt(Enum.sum(&1) / length(&1))) + end +end