Skip to content

Commit

Permalink
little refactor, format
Browse files Browse the repository at this point in the history
  • Loading branch information
bartkrak committed Jul 17, 2024
1 parent b0901b3 commit da15082
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 47 deletions.
20 changes: 5 additions & 15 deletions lib/boombox/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -158,19 +158,9 @@ defmodule Boombox.Pipeline do
end

@impl true
def handle_info({:rtmp_tcp_server, server_pid, socket}, ctx, state) do
{result, rtmp_input_state} =
Boombox.RTMP.handle_connection(server_pid, socket, state.rtmp_input_state)

proceed_result(result, ctx, %{state | rtmp_input_state: rtmp_input_state})
end

@impl true
def handle_info({:rtmp_client_ref, client_ref, app, stream_key}, ctx, state) do
{result, rtmp_input_state} =
Boombox.RTMP.handle_connection(client_ref, state)

proceed_result(result, ctx, %{state | rtmp_input_state: rtmp_input_state})
def handle_info({:rtmp_client_ref, client_ref}, ctx, state) do
Boombox.RTMP.handle_connection(client_ref)
|> proceed_result(ctx, state)
end

@impl true
Expand Down Expand Up @@ -273,8 +263,8 @@ defmodule Boombox.Pipeline do
Boombox.MP4.create_input(location)
end

defp create_input([:rtmp, uri], ctx) do
Boombox.RTMP.create_input(uri, ctx.utility_supervisor)
defp create_input([:rtmp, uri], _ctx) do
Boombox.RTMP.create_input(uri)
end

@spec create_output(Boombox.output(), Membrane.Pipeline.CallbackContext.t()) ::
Expand Down
45 changes: 13 additions & 32 deletions lib/boombox/rtmp.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,28 @@ defmodule Boombox.RTMP do
@moduledoc false

import Membrane.ChildrenSpec
require Membrane.Logger
alias Boombox.Pipeline.{Ready, Wait}

@type state :: %{server_pid: pid()} | nil

@spec create_input(URI.t(), pid()) :: {Wait.t(), state()}
def create_input(uri, utility_supervisor) do

@spec create_input(URI.t()) :: Wait.t()
def create_input(uri) do
uri = URI.new!(uri)
[app | [stream_key]] = String.split(uri.path, "/", [trim: true])
[target_app | [target_stream_key]] = String.split(uri.path, "/", trim: true)

boombox = self()

new_client_callback = fn client_ref, app, stream_key ->
send(boombox, {:rtmp_client_ref, client_ref, app, stream_key})
if app == target_app and stream_key == target_stream_key do
send(boombox, {:rtmp_client_ref, client_ref})
else
Membrane.Logger.warning("Unexpected client connected on /#{app}/#{stream_key}")
end
end

# Run the standalone server
{:ok, server} =
{:ok, _server} =
Membrane.RTMP.Server.start_link(
handler: %Membrane.RTMP.Source.ClientHandler{controlling_process: self()},
port: uri.port,
Expand All @@ -31,8 +35,8 @@ defmodule Boombox.RTMP do
%Wait{}
end

# @spec handle_connection(pid(), :gen_tcp.socket() | :ssl.sslsocket(), state()) :: {Ready.t(), state()}
def handle_connection(client_ref, state) do
@spec handle_connection(pid()) :: Ready.t()
def handle_connection(client_ref) do
spec = [
child(:rtmp_source, %Membrane.RTMP.SourceBin{client_ref: client_ref})
|> via_out(:audio)
Expand All @@ -45,32 +49,9 @@ defmodule Boombox.RTMP do
video: get_child(:rtmp_source) |> via_out(:video)
}

{%Ready{spec_builder: spec, track_builders: track_builders}, state}
%Ready{spec_builder: spec, track_builders: track_builders}
end

# @spec handle_connection(pid(), :gen_tcp.socket() | :ssl.sslsocket(), state()) ::
# {Ready.t(), state()}
# def handle_connection(client_ref, state) do
# spec = [
# child(:rtmp_source, %Membrane.RTMP.SourceBin{client_ref: client_ref})
# |> via_out(:audio)
# |> child(Membrane.AAC.Parser)
# |> child(:aac_decoder, Membrane.AAC.FDK.Decoder)
# ]

# track_builders = %{
# audio: get_child(:aac_decoder),
# video: get_child(:rtmp_source) |> via_out(:video)
# }

# {%Ready{spec_builder: spec, track_builders: track_builders}, state}
# end

# def handle_connection(_server_pid, _socket, state) do
# send(state.server_pid, :rtmp_already_connected)
# {%Wait{}, state}
# end

@spec handle_socket_control(pid(), state()) :: Wait.t()
def handle_socket_control(source_pid, state) do
send(state.server_pid, {:rtmp_source_pid, source_pid})
Expand Down

0 comments on commit da15082

Please sign in to comment.