Skip to content

Commit

Permalink
works
Browse files Browse the repository at this point in the history
  • Loading branch information
bartkrak committed Aug 22, 2024
1 parent e98f1b9 commit 2618977
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 19 deletions.
8 changes: 6 additions & 2 deletions lib/boombox/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ defmodule Boombox.Pipeline do
output: opts |> Keyword.fetch!(:output) |> parse_output(),
status: :init
}

IO.inspect(state.input)
proceed(ctx, state)
end

Expand Down Expand Up @@ -272,6 +272,10 @@ defmodule Boombox.Pipeline do
defp create_input({:rtmp, src}, ctx) do
Boombox.RTMP.create_input(src, ctx.utility_supervisor)
end
defp create_input(any, ctx) do
IO.inspect(any)
# Boombox.RTMP.create_input(src, ctx.utility_supervisor)
end

@spec create_output(Boombox.output(), Membrane.Pipeline.CallbackContext.t()) ::
Ready.t() | Wait.t()
Expand Down Expand Up @@ -321,7 +325,7 @@ defmodule Boombox.Pipeline do
end

defp parse_input(input) when is_pid(input) do
input
{:rtmp, input}
end

defp parse_input(input) when is_tuple(input) do
Expand Down
3 changes: 1 addition & 2 deletions lib/boombox/rtmp.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ defmodule Boombox.RTMP do
@spec create_input(String.t() | pid(), pid()) :: Wait.t()
def create_input(client_ref, utility_supervisor) when is_pid(client_ref) do
IO.puts("GOT PID #{inspect(client_ref)}")

%Wait{}
handle_connection(client_ref)
end

def create_input(uri, utility_supervisor) do
Expand Down
28 changes: 13 additions & 15 deletions test/boombox_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -130,37 +130,35 @@ defmodule BoomboxTest do
async_test "rtmp -> mp4", %{tmp_dir: tmp} do
output = Path.join(tmp, "output.mp4")
url = "rtmp://localhost:5000/app/stream_key"
{use_ssl?, port, app, stream_key} = Membrane.RTMPServer.parse_url(url)

{use_ssl?, port, target_app, target_stream_key} = Membrane.RTMP.Utils.parse_url(url)

boombox = self()
parent_process_pid = self()

new_client_callback = fn client_ref, app, stream_key ->
if app == target_app and stream_key == target_stream_key do
send(boombox, {:rtmp_client_ref, client_ref})
end
send(parent_process_pid, {:client_ref, client_ref, app, stream_key})
end

# start RTMP server to listen for connections
{:ok, _server} =
{:ok, server} =
Membrane.RTMPServer.start_link(
handler: %Membrane.RTMP.Source.ClientHandler{controlling_process: self()},
handler: %Membrane.RTMP.Source.ClientHandlerImpl{controlling_process: self()},
port: port,
use_ssl?: use_ssl?,
new_client_callback: new_client_callback,
client_timeout: 1_000
client_timeout: 9_000
)


# send RTMP stream
p = send_rtmp(url)

# wait for a client to connect
client_ref = receive do
{:rtmp_client_ref, client_ref} -> client_ref
after
5_000 -> "Didn't receive client_ref before timeout"
end
{:ok, client_ref} =
receive do
{:client_ref, client_ref, ^app, ^stream_key} ->
{:ok, client_ref}
after
10_000 -> :timeout
end

# run boombox with client_ref
t = Task.async(fn -> Boombox.run(input: client_ref, output: output) end)
Expand Down

0 comments on commit 2618977

Please sign in to comment.