From 2618977aefae127b22934ed9984e44a338ddd953 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Chali=C5=84ski?= Date: Thu, 22 Aug 2024 17:14:41 +0200 Subject: [PATCH] works --- lib/boombox/pipeline.ex | 8 ++++++-- lib/boombox/rtmp.ex | 3 +-- test/boombox_test.exs | 28 +++++++++++++--------------- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/lib/boombox/pipeline.ex b/lib/boombox/pipeline.ex index 2dbf35c..df4731e 100644 --- a/lib/boombox/pipeline.ex +++ b/lib/boombox/pipeline.ex @@ -102,7 +102,7 @@ defmodule Boombox.Pipeline do output: opts |> Keyword.fetch!(:output) |> parse_output(), status: :init } - + IO.inspect(state.input) proceed(ctx, state) end @@ -272,6 +272,10 @@ defmodule Boombox.Pipeline do defp create_input({:rtmp, src}, ctx) do Boombox.RTMP.create_input(src, ctx.utility_supervisor) end + defp create_input(any, ctx) do + IO.inspect(any) + # Boombox.RTMP.create_input(src, ctx.utility_supervisor) + end @spec create_output(Boombox.output(), Membrane.Pipeline.CallbackContext.t()) :: Ready.t() | Wait.t() @@ -321,7 +325,7 @@ defmodule Boombox.Pipeline do end defp parse_input(input) when is_pid(input) do - input + {:rtmp, input} end defp parse_input(input) when is_tuple(input) do diff --git a/lib/boombox/rtmp.ex b/lib/boombox/rtmp.ex index 48a029b..e557d90 100644 --- a/lib/boombox/rtmp.ex +++ b/lib/boombox/rtmp.ex @@ -9,8 +9,7 @@ defmodule Boombox.RTMP do @spec create_input(String.t() | pid(), pid()) :: Wait.t() def create_input(client_ref, utility_supervisor) when is_pid(client_ref) do IO.puts("GOT PID #{inspect(client_ref)}") - - %Wait{} + handle_connection(client_ref) end def create_input(uri, utility_supervisor) do diff --git a/test/boombox_test.exs b/test/boombox_test.exs index 7308700..ba93ba3 100644 --- a/test/boombox_test.exs +++ b/test/boombox_test.exs @@ -130,37 +130,35 @@ defmodule BoomboxTest do async_test "rtmp -> mp4", %{tmp_dir: tmp} do output = Path.join(tmp, "output.mp4") url = "rtmp://localhost:5000/app/stream_key" + {use_ssl?, port, app, stream_key} = Membrane.RTMPServer.parse_url(url) - {use_ssl?, port, target_app, target_stream_key} = Membrane.RTMP.Utils.parse_url(url) - - boombox = self() + parent_process_pid = self() new_client_callback = fn client_ref, app, stream_key -> - if app == target_app and stream_key == target_stream_key do - send(boombox, {:rtmp_client_ref, client_ref}) - end + send(parent_process_pid, {:client_ref, client_ref, app, stream_key}) end # start RTMP server to listen for connections - {:ok, _server} = + {:ok, server} = Membrane.RTMPServer.start_link( - handler: %Membrane.RTMP.Source.ClientHandler{controlling_process: self()}, + handler: %Membrane.RTMP.Source.ClientHandlerImpl{controlling_process: self()}, port: port, use_ssl?: use_ssl?, new_client_callback: new_client_callback, - client_timeout: 1_000 + client_timeout: 9_000 ) - # send RTMP stream p = send_rtmp(url) # wait for a client to connect - client_ref = receive do - {:rtmp_client_ref, client_ref} -> client_ref - after - 5_000 -> "Didn't receive client_ref before timeout" - end + {:ok, client_ref} = + receive do + {:client_ref, client_ref, ^app, ^stream_key} -> + {:ok, client_ref} + after + 10_000 -> :timeout + end # run boombox with client_ref t = Task.async(fn -> Boombox.run(input: client_ref, output: output) end)