From da1508290950c0043752dbb0b0bbc0346a5b4eed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Chali=C5=84ski?= Date: Wed, 17 Jul 2024 18:08:44 +0200 Subject: [PATCH] little refactor, format --- lib/boombox/pipeline.ex | 20 +++++------------- lib/boombox/rtmp.ex | 45 ++++++++++++----------------------------- 2 files changed, 18 insertions(+), 47 deletions(-) diff --git a/lib/boombox/pipeline.ex b/lib/boombox/pipeline.ex index 098bae4..d7d5404 100644 --- a/lib/boombox/pipeline.ex +++ b/lib/boombox/pipeline.ex @@ -158,19 +158,9 @@ defmodule Boombox.Pipeline do end @impl true - def handle_info({:rtmp_tcp_server, server_pid, socket}, ctx, state) do - {result, rtmp_input_state} = - Boombox.RTMP.handle_connection(server_pid, socket, state.rtmp_input_state) - - proceed_result(result, ctx, %{state | rtmp_input_state: rtmp_input_state}) - end - - @impl true - def handle_info({:rtmp_client_ref, client_ref, app, stream_key}, ctx, state) do - {result, rtmp_input_state} = - Boombox.RTMP.handle_connection(client_ref, state) - - proceed_result(result, ctx, %{state | rtmp_input_state: rtmp_input_state}) + def handle_info({:rtmp_client_ref, client_ref}, ctx, state) do + Boombox.RTMP.handle_connection(client_ref) + |> proceed_result(ctx, state) end @impl true @@ -273,8 +263,8 @@ defmodule Boombox.Pipeline do Boombox.MP4.create_input(location) end - defp create_input([:rtmp, uri], ctx) do - Boombox.RTMP.create_input(uri, ctx.utility_supervisor) + defp create_input([:rtmp, uri], _ctx) do + Boombox.RTMP.create_input(uri) end @spec create_output(Boombox.output(), Membrane.Pipeline.CallbackContext.t()) :: diff --git a/lib/boombox/rtmp.ex b/lib/boombox/rtmp.ex index d809d22..ab288af 100644 --- a/lib/boombox/rtmp.ex +++ b/lib/boombox/rtmp.ex @@ -2,24 +2,28 @@ defmodule Boombox.RTMP do @moduledoc false import Membrane.ChildrenSpec + require Membrane.Logger alias Boombox.Pipeline.{Ready, Wait} @type state :: %{server_pid: pid()} | nil - @spec create_input(URI.t(), pid()) :: {Wait.t(), state()} - def create_input(uri, utility_supervisor) do - + @spec create_input(URI.t()) :: Wait.t() + def create_input(uri) do uri = URI.new!(uri) - [app | [stream_key]] = String.split(uri.path, "/", [trim: true]) + [target_app | [target_stream_key]] = String.split(uri.path, "/", trim: true) boombox = self() new_client_callback = fn client_ref, app, stream_key -> - send(boombox, {:rtmp_client_ref, client_ref, app, stream_key}) + if app == target_app and stream_key == target_stream_key do + send(boombox, {:rtmp_client_ref, client_ref}) + else + Membrane.Logger.warning("Unexpected client connected on /#{app}/#{stream_key}") + end end # Run the standalone server - {:ok, server} = + {:ok, _server} = Membrane.RTMP.Server.start_link( handler: %Membrane.RTMP.Source.ClientHandler{controlling_process: self()}, port: uri.port, @@ -31,8 +35,8 @@ defmodule Boombox.RTMP do %Wait{} end - # @spec handle_connection(pid(), :gen_tcp.socket() | :ssl.sslsocket(), state()) :: {Ready.t(), state()} - def handle_connection(client_ref, state) do + @spec handle_connection(pid()) :: Ready.t() + def handle_connection(client_ref) do spec = [ child(:rtmp_source, %Membrane.RTMP.SourceBin{client_ref: client_ref}) |> via_out(:audio) @@ -45,32 +49,9 @@ defmodule Boombox.RTMP do video: get_child(:rtmp_source) |> via_out(:video) } - {%Ready{spec_builder: spec, track_builders: track_builders}, state} + %Ready{spec_builder: spec, track_builders: track_builders} end - # @spec handle_connection(pid(), :gen_tcp.socket() | :ssl.sslsocket(), state()) :: - # {Ready.t(), state()} - # def handle_connection(client_ref, state) do - # spec = [ - # child(:rtmp_source, %Membrane.RTMP.SourceBin{client_ref: client_ref}) - # |> via_out(:audio) - # |> child(Membrane.AAC.Parser) - # |> child(:aac_decoder, Membrane.AAC.FDK.Decoder) - # ] - - # track_builders = %{ - # audio: get_child(:aac_decoder), - # video: get_child(:rtmp_source) |> via_out(:video) - # } - - # {%Ready{spec_builder: spec, track_builders: track_builders}, state} - # end - - # def handle_connection(_server_pid, _socket, state) do - # send(state.server_pid, :rtmp_already_connected) - # {%Wait{}, state} - # end - @spec handle_socket_control(pid(), state()) :: Wait.t() def handle_socket_control(source_pid, state) do send(state.server_pid, {:rtmp_source_pid, source_pid})