From 67b7b4942cafe398cb0113e27819381df8d6ebdb Mon Sep 17 00:00:00 2001 From: Max Filipovich Date: Sat, 27 Nov 2021 13:39:26 +0100 Subject: [PATCH 1/2] Introduce v2 --- README.md | 2 +- lib/firehose.ex | 1 + lib/firehose/backend.ex | 4 + lib/firehose/backend/aws.ex | 84 ++++++++++++++++++ lib/firehose/backend/dummy.ex | 7 ++ lib/firehose/emitter.ex | 155 +++------------------------------- lib/firehose/manager.ex | 59 ++++--------- lib/firehose/settings.ex | 34 ++++++++ lib/firehose/supervisor.ex | 15 ++++ lib/firehose/utils.ex | 25 ++++++ mix.exs | 3 +- mix.lock | 27 +++--- test/firehose_test.exs | 14 +++ test/test_helper.exs | 3 + 14 files changed, 236 insertions(+), 197 deletions(-) create mode 100644 lib/firehose/backend.ex create mode 100644 lib/firehose/backend/aws.ex create mode 100644 lib/firehose/backend/dummy.ex create mode 100644 lib/firehose/settings.ex create mode 100644 lib/firehose/supervisor.ex create mode 100644 lib/firehose/utils.ex diff --git a/README.md b/README.md index 9b90c2c..573266a 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ config :firehose, Firehose.Manager, delimiter: "\n", # Add delimiter after each event or add nothing if `false` or `nil` set ``` -Add `Firehose.Manager` to your supervisor and thats all. You can emit events to AWS Firehose: +Add `Firehose.Supervisor` to your supervisor tree and thats it. You can emit events to AWS Firehose like this: ```elixir Firehose.emit("logs", %{"name" => "pageview", "ua" => "..."}) diff --git a/lib/firehose.ex b/lib/firehose.ex index c759770..aca48df 100644 --- a/lib/firehose.ex +++ b/lib/firehose.ex @@ -1,3 +1,4 @@ defmodule Firehose do defdelegate emit(stream, data), to: Firehose.Manager + defdelegate emit(pid, stream, data), to: Firehose.Manager end diff --git a/lib/firehose/backend.ex b/lib/firehose/backend.ex new file mode 100644 index 0000000..cff8ce7 --- /dev/null +++ b/lib/firehose/backend.ex @@ -0,0 +1,4 @@ +defmodule Firehose.Backend do + @callback sync(Firehose.Emitter.t(), integer()) :: + {:ok, :nothing | :synced} | {:error, :failure, String.t()} +end diff --git a/lib/firehose/backend/aws.ex b/lib/firehose/backend/aws.ex new file mode 100644 index 0000000..b09a17a --- /dev/null +++ b/lib/firehose/backend/aws.ex @@ -0,0 +1,84 @@ +defmodule Firehose.Backend.Aws do + @behaviour Firehose.Backend + + require Logger + + alias Firehose.{Batch, Emitter, Settings} + + @impl Firehose.Backend + def sync(%Emitter{batch: %Batch{size: 0}}, _), do: {:ok, :nothing} + def sync(%Emitter{batch: %Batch{size: nil}}, _), do: {:ok, :nothing} + + def sync(%Emitter{} = state, 0) do + Logger.error("[Firehose] [#{state.stream}] Cannot flush batch") + + for handler <- Settings.handlers() do + apply(handler, :subscribe, [:failed, state.batch]) + end + end + + def sync(%Emitter{} = state, tries) do + %Batch{stream: stream, current_record: record, records: records} = state.batch + + records = records || [] + records = [record | records] + + case send_records(state, stream, for(record <- records, do: %{data: record.data})) do + {:error, _, _} -> + Process.sleep(1_000) + sync(state, tries - 1) + + {:ok, _} = result -> + result + end + end + + defp send_records(state, stream, records) do + if Settings.debug() do + Logger.debug("[Firehose] [#{stream}] Send records #{inspect(records)}") + {:ok, :synced} + else + send_records_to_firehose(state, stream, records) + end + end + + defp send_records_to_firehose(state, stream, records) do + result = + stream + |> ExAws.Firehose.put_record_batch(records) + |> ExAws.request() + + Logger.debug("[Firehose] [#{stream}] AWS response: #{inspect(result)}") + + case result do + {:ok, %{"FailedPutCount" => failed, "RequestResponses" => responses}} when failed > 0 -> + # We should resend failed records + failed_record = + for {record, index} <- Enum.with_index(responses), + Map.get(record, "ErrorCode") != nil do + Logger.debug("[Firehose] [#{stream}] failed record #{inspect(record)}") + Enum.at(records, index) + end + + send_records(state, stream, failed_record) + + {:ok, %{"FailedPutCount" => 0}} -> + Logger.info("[Firehose] [#{stream}] Batch was successfully flushed") + + for handler <- Settings.handlers() do + apply(handler, :subscribe, [:completed, state.batch]) + end + + {:ok, :synced} + + response -> + Logger.error("[Firehose] [#{stream}] Batch was not flushed due error. Retry later...") + + for handler <- Settings.handlers() do + apply(handler, :subscribe, [:error, state.batch, response]) + end + + {:error, :failure, response} + end + end +end diff --git a/lib/firehose/backend/dummy.ex b/lib/firehose/backend/dummy.ex new file mode 100644 index 0000000..0f023f1 --- /dev/null +++ b/lib/firehose/backend/dummy.ex @@ -0,0 +1,7 @@ +defmodule Firehose.Backend.Dummy do + @behaviour Firehose.Backend + require Logger + + @impl Firehose.Backend + def sync(_, _), do: {:ok, :nothing} +end diff --git a/lib/firehose/emitter.ex b/lib/firehose/emitter.ex index 6c917ab..9d5714c 100644 --- a/lib/firehose/emitter.ex +++ b/lib/firehose/emitter.ex @@ -1,13 +1,12 @@ defmodule Firehose.Emitter do use GenServer require Logger - alias Firehose.{Emitter, Batch} - - @settings Application.get_env(:firehose, Firehose.Manager) || [] - @default [flush_interval: 1_000, retries: 5, serializer: Poison, delimiter: "\n", debug: false] + alias Firehose.{Emitter, Batch, Settings, Utils} defstruct [:stream, :manager, :settings, :batch] + @backend Settings.backend() + def start_link(stream, manager) do GenServer.start_link(__MODULE__, {stream, manager}, []) end @@ -15,11 +14,7 @@ defmodule Firehose.Emitter do def init({stream, manager}) do Process.flag(:trap_exit, true) - Logger.info( - "[Firehose] [#{stream}] Initialize emitter with settings: #{ - inspect(Keyword.merge(@default, @settings)) - }" - ) + Logger.info("[Firehose] [#{stream}] Initialize emitter") state = %Emitter{ stream: stream, @@ -27,7 +22,7 @@ defmodule Firehose.Emitter do batch: %Batch{stream: stream} } - Process.send_after(self(), :sync, flush_interval()) + Process.send_after(self(), :sync, Settings.flush_interval()) {:ok, state} end @@ -35,7 +30,7 @@ defmodule Firehose.Emitter do def emit(stream, data), do: emit(__MODULE__, stream, data) def emit(pid, stream, data) when is_binary(stream) do - case serialize(data) do + case Utils.serialize(data) do {:ok, data} -> GenServer.call(pid, {:emit, stream, data}) {:ok, :emitted} @@ -56,173 +51,51 @@ defmodule Firehose.Emitter do {:reply, error, state} {:error, reason} when reason in [:size_limit, :records_limit] -> - flush(state, retries()) + flush(state, Settings.retries()) handle_call(args, from, %Emitter{state | batch: %Batch{stream: stream}}) end end def handle_info(:sync, %Emitter{batch: %Batch{size: 0}} = state) do - Process.send_after(self(), :sync, flush_interval()) + Process.send_after(self(), :sync, Settings.flush_interval()) {:noreply, state} end def handle_info(:sync, %Emitter{batch: %Batch{size: nil}} = state) do - Process.send_after(self(), :sync, flush_interval()) + Process.send_after(self(), :sync, Settings.flush_interval()) {:noreply, state} end def handle_info(:sync, %Emitter{stream: stream} = state) do flush(state, state.settings[:retries]) - Process.send_after(self(), :sync, flush_interval()) + Process.send_after(self(), :sync, Settings.flush_interval()) {:noreply, %Emitter{state | batch: %Batch{stream: stream}}} end def terminate(reason, %Emitter{} = state) do Logger.info( - "[Firehose] [#{state.batch.stream}] Terminating due to #{inspect(reason)}. Pending batch #{ - debug(state) - }" + "[Firehose] [#{state.batch.stream}] Terminating due to #{inspect(reason)}. Pending batch #{debug(state)}" ) # Flush events - send_sync(state, retries()) + @backend.sync(state, Settings.retries()) :ok end - defp retries do - @settings[:retries] || @default[:retries] - end - - defp flush_interval do - @settings[:flush_interval] || @default[:flush_interval] - end - - defp handlers do - @settings[:handlers] || @default[:handlers] || [] - end - - defp delimiter do - @settings[:delimiter] || @default[:delimiter] - end - - defp serializer do - @settings[:serializer] || @default[:serializer] - end - - defp serialize(data) do - delimiter = - case delimiter() do - nil -> "" - delimiter -> delimiter - end - - case serializer() do - nil -> - {:ok, data <> delimiter} - - serializer -> - case serializer.encode(data) do - {:ok, data} -> - {:ok, data <> delimiter} - - error -> - error - end - end - end - defp flush(%Emitter{} = state, tries) do Logger.info("[Firehose] [#{state.batch.stream}] Flushing pending batch #{debug(state)}") - spawn(fn -> send_sync(state, tries) end) - end - - defp send_sync(%Emitter{batch: %Batch{size: 0}}, _), do: :ok - defp send_sync(%Emitter{batch: %Batch{size: nil}}, _), do: :ok - - defp send_sync(%Emitter{} = state, 0) do - Logger.error("[Firehose] [#{state.stream}] Cannot flush batch") - - for handler <- handlers() do - apply(handler, :subscribe, [:failed, state.batch]) - end - end - - defp send_sync(%Emitter{} = state, tries) do - %Batch{stream: stream, current_record: record, records: records} = state.batch - - records = records || [] - records = [record | records] - - case send_records(state, stream, for(record <- records, do: %{data: record.data})) do - {:error, _, _} -> - Process.sleep(1_000) - send_sync(state, tries - 1) - - {:ok, _} = result -> - result - end - end - - defp send_records(state, stream, records) do - if @settings[:debug] do - Logger.debug("[Firehose] [#{stream}] Send records #{inspect(records)}") - {:ok, :sended} - else - send_records_to_firehose(state, stream, records) - end - end - - defp send_records_to_firehose(state, stream, records) do - result = - stream - |> ExAws.Firehose.put_record_batch(records) - |> ExAws.request() - - Logger.debug("[Firehose] [#{stream}] AWS response: #{inspect(result)}") - - case result do - {:ok, %{"FailedPutCount" => failed, "RequestResponses" => responses}} when failed > 0 -> - # We should resend failed records - failed_record = - for {record, index} <- Enum.with_index(responses), - Map.get(record, "ErrorCode") != nil do - Logger.debug("[Firehose] [#{stream}] failed record #{inspect(record)}") - Enum.at(records, index) - end - - send_records(state, stream, failed_record) - - {:ok, %{"FailedPutCount" => 0}} -> - Logger.info("[Firehose] [#{stream}] Batch was successfully flushed") - - for handler <- handlers() do - apply(handler, :subscribe, [:completed, state.batch]) - end - - {:ok, :sended} - - response -> - Logger.error("[Firehose] [#{stream}] Batch was not flushed due error. Retry later...") - - for handler <- handlers() do - apply(handler, :subscribe, [:error, state.batch, response]) - end - - {:error, :failure, response} - end + spawn(fn -> @backend.sync(state, tries) end) end defp debug(state) do if state.batch.current_record == nil do "(#{state.batch.records_size} records, #{state.batch.size} bytes)" else - "(#{state.batch.records_size} records, #{state.batch.size} bytes, current record: #{ - state.batch.current_record.size - } bytes)" + "(#{state.batch.records_size} records, #{state.batch.size} bytes, current record: #{state.batch.current_record.size} bytes)" end end end diff --git a/lib/firehose/manager.ex b/lib/firehose/manager.ex index de8a210..0cb62d2 100644 --- a/lib/firehose/manager.ex +++ b/lib/firehose/manager.ex @@ -1,56 +1,33 @@ defmodule Firehose.Manager do - use GenServer require Logger - def emit(stream, data), do: emit(__MODULE__, stream, data) + alias Firehose.Emitter - def start_link(options \\ []) do - GenServer.start_link(__MODULE__, options, name: __MODULE__) - end - - def start_link(name, options) do - GenServer.start_link(__MODULE__, options, name: name) - end - - def init(_opts) do - Process.flag(:trap_exit, true) - - {:ok, %{}} + def emit(stream, data) when is_binary(stream) do + Firehose.Supervisor + |> Supervisor.which_children() + |> route_event(Firehose.Supervisor, stream, data) end def emit(pid, stream, data) when is_binary(stream) do - GenServer.call(pid, {:emit, stream, data}) + pid + |> Supervisor.which_children() + |> route_event(pid, stream, data) end - def handle_call({:emit, stream, data}, _from, state) do - case state[stream] do + defp route_event(streams, pid, stream, data) do + case Enum.find(streams, fn {id, _, _, _} -> id == stream end) do nil -> - {pid, state} = create_emitter_for(stream, state) - Firehose.Emitter.emit(pid, stream, data) - - {:reply, :ok, state} + {:ok, child_pid} = + Supervisor.start_child(pid, %{ + id: stream, + start: {Emitter, :start_link, [stream, Supervisor]} + }) - pid when is_pid(pid) -> - if Process.alive?(pid) do - Firehose.Emitter.emit(pid, stream, data) - {:reply, :ok, state} - else - {pid, state} = create_emitter_for(stream, state) - Firehose.Emitter.emit(pid, stream, data) + Emitter.emit(child_pid, stream, data) - {:reply, :ok, state} - end + {_, pid, _, _} -> + Emitter.emit(pid, stream, data) end end - - def terminate(_) do - Logger.info("#{__MODULE__} terminating...") - - :ok - end - - defp create_emitter_for(stream, state) do - {:ok, pid} = Firehose.Emitter.start_link(stream, __MODULE__) - {pid, Map.put(state, stream, pid)} - end end diff --git a/lib/firehose/settings.ex b/lib/firehose/settings.ex new file mode 100644 index 0000000..016f52f --- /dev/null +++ b/lib/firehose/settings.ex @@ -0,0 +1,34 @@ +defmodule Firehose.Settings do + @default [flush_interval: 1_000, retries: 5, serializer: Poison, delimiter: "\n", debug: false] + + def debug, do: settings(:debug) || false + + def retries do + settings(:retries) || @default[:retries] + end + + def flush_interval do + settings(:flush_interval) || @default[:flush_interval] + end + + def handlers do + settings(:handlers) || @default[:handlers] || [] + end + + def delimiter do + settings(:delimiter) || @default[:delimiter] + end + + def serializer do + settings(:serializer) || @default[:serializer] + end + + def backend() do + settings(:backend) || Firehose.Backend.Aws + end + + defp settings(key) do + env = Application.get_env(:firehose, Firehose.Manager) || [] + Keyword.get(env, key) + end +end diff --git a/lib/firehose/supervisor.ex b/lib/firehose/supervisor.ex new file mode 100644 index 0000000..1cf559c --- /dev/null +++ b/lib/firehose/supervisor.ex @@ -0,0 +1,15 @@ +defmodule Firehose.Supervisor do + use Supervisor + + def start_link() do + Supervisor.start_link(__MODULE__, [], name: __MODULE__) + end + + @impl true + def init(_args) do + # we will add emitters on the fly + children = [] + + Supervisor.init(children, strategy: :one_for_one) + end +end diff --git a/lib/firehose/utils.ex b/lib/firehose/utils.ex new file mode 100644 index 0000000..c92824b --- /dev/null +++ b/lib/firehose/utils.ex @@ -0,0 +1,25 @@ +defmodule Firehose.Utils do + alias Firehose.Settings + + def serialize(data) do + delimiter = + case Settings.delimiter() do + nil -> "" + delimiter -> delimiter + end + + case Settings.serializer() do + nil -> + {:ok, data <> delimiter} + + serializer -> + case serializer.encode(data) do + {:ok, data} -> + {:ok, data <> delimiter} + + error -> + error + end + end + end +end diff --git a/mix.exs b/mix.exs index 1414c32..538d899 100644 --- a/mix.exs +++ b/mix.exs @@ -37,7 +37,8 @@ defmodule Firehose.MixProject do {:ex_aws_firehose, "~> 2.0"}, {:poison, "~> 3.0"}, {:hackney, "~> 1.9"}, - {:ex_doc, ">= 0.0.0", only: :dev} + {:ex_doc, ">= 0.0.0", only: :dev}, + {:mox, "~> 1.0.1", only: :test} ] end end diff --git a/mix.lock b/mix.lock index 13720f3..8e7fb1c 100644 --- a/mix.lock +++ b/mix.lock @@ -1,15 +1,16 @@ %{ - "certifi": {:hex, :certifi, "2.3.1", "d0f424232390bf47d82da8478022301c561cf6445b5b5fb6a84d49a9e76d2639", [:rebar3], [{:parse_trans, "3.2.0", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm"}, - "earmark": {:hex, :earmark, "1.2.5", "4d21980d5d2862a2e13ec3c49ad9ad783ffc7ca5769cf6ff891a4553fbaae761", [:mix], [], "hexpm"}, - "ex_aws": {:hex, :ex_aws, "2.0.2", "8df2f96f58624a399abe5a0ce26db648ee848aca6393b9c65c939ece9ac07bfa", [:mix], [{:configparser_ex, "~> 2.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "1.6.3 or 1.6.5 or 1.7.1 or 1.8.6 or ~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8", [hex: :jsx, repo: "hexpm", optional: true]}, {:poison, ">= 1.2.0", [hex: :poison, repo: "hexpm", optional: true]}, {:sweet_xml, "~> 0.6", [hex: :sweet_xml, repo: "hexpm", optional: true]}, {:xml_builder, "~> 0.1.0", [hex: :xml_builder, repo: "hexpm", optional: true]}], "hexpm"}, - "ex_aws_firehose": {:hex, :ex_aws_firehose, "2.0.0", "afb1b2520a0f3965d40916370c55843b7202dc703ca3b20b63aa793a11124934", [:mix], [{:ex_aws, "~> 2.0.0", [hex: :ex_aws, repo: "hexpm", optional: false]}], "hexpm"}, - "ex_doc": {:hex, :ex_doc, "0.18.3", "f4b0e4a2ec6f333dccf761838a4b253d75e11f714b85ae271c9ae361367897b7", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"}, - "hackney": {:hex, :hackney, "1.12.1", "8bf2d0e11e722e533903fe126e14d6e7e94d9b7983ced595b75f532e04b7fdc7", [:rebar3], [{:certifi, "2.3.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.1", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, - "idna": {:hex, :idna, "5.1.1", "cbc3b2fa1645113267cc59c760bafa64b2ea0334635ef06dbac8801e42f7279c", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"}, - "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"}, - "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], [], "hexpm"}, - "parse_trans": {:hex, :parse_trans, "3.2.0", "2adfa4daf80c14dc36f522cf190eb5c4ee3e28008fc6394397c16f62a26258c2", [:rebar3], [], "hexpm"}, - "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, - "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], [], "hexpm"}, - "unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [:rebar3], [], "hexpm"}, + "certifi": {:hex, :certifi, "2.3.1", "d0f424232390bf47d82da8478022301c561cf6445b5b5fb6a84d49a9e76d2639", [:rebar3], [{:parse_trans, "3.2.0", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "e12d667d042c11d130594bae2b0097e63836fe8b1e6d6b2cc48f8bb7a2cf7d68"}, + "earmark": {:hex, :earmark, "1.2.5", "4d21980d5d2862a2e13ec3c49ad9ad783ffc7ca5769cf6ff891a4553fbaae761", [:mix], [], "hexpm", "c57508ddad47dfb8038ca6de1e616e66e9b87313220ac5d9817bc4a4dc2257b9"}, + "ex_aws": {:hex, :ex_aws, "2.0.2", "8df2f96f58624a399abe5a0ce26db648ee848aca6393b9c65c939ece9ac07bfa", [:mix], [{:configparser_ex, "~> 2.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "1.6.3 or 1.6.5 or 1.7.1 or 1.8.6 or ~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8", [hex: :jsx, repo: "hexpm", optional: true]}, {:poison, ">= 1.2.0", [hex: :poison, repo: "hexpm", optional: true]}, {:sweet_xml, "~> 0.6", [hex: :sweet_xml, repo: "hexpm", optional: true]}, {:xml_builder, "~> 0.1.0", [hex: :xml_builder, repo: "hexpm", optional: true]}], "hexpm", "e32830626ef09d1ed843d686f31b6b226cabc001792c5a81d3ae9d52e9877644"}, + "ex_aws_firehose": {:hex, :ex_aws_firehose, "2.0.0", "afb1b2520a0f3965d40916370c55843b7202dc703ca3b20b63aa793a11124934", [:mix], [{:ex_aws, "~> 2.0.0", [hex: :ex_aws, repo: "hexpm", optional: false]}], "hexpm", "c39be12ccd9ddf56ea78f473eb7e15c4261bf9b993284df0ce0f55b0c96fd9ab"}, + "ex_doc": {:hex, :ex_doc, "0.18.3", "f4b0e4a2ec6f333dccf761838a4b253d75e11f714b85ae271c9ae361367897b7", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm", "33d7b70d87d45ed899180fb0413fb77c7c48843188516e15747e00fdecf572b6"}, + "hackney": {:hex, :hackney, "1.12.1", "8bf2d0e11e722e533903fe126e14d6e7e94d9b7983ced595b75f532e04b7fdc7", [:rebar3], [{:certifi, "2.3.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.1", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "8d5c94391a1dd525e58713b4fb43be9a930360ea8e74d0474e535ff579df6071"}, + "idna": {:hex, :idna, "5.1.1", "cbc3b2fa1645113267cc59c760bafa64b2ea0334635ef06dbac8801e42f7279c", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "12197a282ab74a30dbe5853ec4d1dca3332f1fdc8ebed682c083e467d64f6491"}, + "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, + "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], [], "hexpm", "7a4c8e1115a2732a67d7624e28cf6c9f30c66711a9e92928e745c255887ba465"}, + "mox": {:hex, :mox, "1.0.1", "b651bf0113265cda0ba3a827fcb691f848b683c373b77e7d7439910a8d754d6e", [:mix], [], "hexpm", "35bc0dea5499d18db4ef7fe4360067a59b06c74376eb6ab3bd67e6295b133469"}, + "parse_trans": {:hex, :parse_trans, "3.2.0", "2adfa4daf80c14dc36f522cf190eb5c4ee3e28008fc6394397c16f62a26258c2", [:rebar3], [], "hexpm", "578b1d484720749499db5654091ddac818ea0b6d568f2c99c562d2a6dd4aa117"}, + "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm", "fec8660eb7733ee4117b85f55799fd3833eb769a6df71ccf8903e8dc5447cfce"}, + "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], [], "hexpm", "4f8805eb5c8a939cf2359367cb651a3180b27dfb48444846be2613d79355d65e"}, + "unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [:rebar3], [], "hexpm", "da1d9bef8a092cc7e1e51f1298037a5ddfb0f657fe862dfe7ba4c5807b551c29"}, } diff --git a/test/firehose_test.exs b/test/firehose_test.exs index a50f5f4..e6d0398 100644 --- a/test/firehose_test.exs +++ b/test/firehose_test.exs @@ -1,3 +1,17 @@ defmodule FirehoseTest do use ExUnit.Case + + import Mox + + setup do + {:ok, pid} = Supervisor.start_link(Firehose.Supervisor, []) + + [pid: pid] + end + + test "it create event emitter", %{pid: pid} do + Firehose.emit(pid, "topic", "test") + + assert [{"topic", _pid, _type, [Firehose.Emitter]}] = Supervisor.which_children(pid) + end end diff --git a/test/test_helper.exs b/test/test_helper.exs index 869559e..3bb729d 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1 +1,4 @@ ExUnit.start() + +Mox.defmock(Firehose.Backend.Mock, for: Firehose.Backend) +Application.put_env(:firehose, Firehose.Manager, backend: Firehose.Backend.Mock) From 2d3953b2e335629f0f15eeb70fdbaff7bc0be505 Mon Sep 17 00:00:00 2001 From: Max Filipovich Date: Sat, 27 Nov 2021 20:00:52 +0100 Subject: [PATCH 2/2] Make testing that verify restarting stream emitter --- lib/firehose/emitter.ex | 9 +++++---- lib/firehose/settings.ex | 30 ++++++------------------------ test/firehose_test.exs | 20 ++++++++++++++++++-- test/test_helper.exs | 5 +++-- 4 files changed, 32 insertions(+), 32 deletions(-) diff --git a/lib/firehose/emitter.ex b/lib/firehose/emitter.ex index 9d5714c..47501b3 100644 --- a/lib/firehose/emitter.ex +++ b/lib/firehose/emitter.ex @@ -5,8 +5,6 @@ defmodule Firehose.Emitter do alias Firehose.{Emitter, Batch, Settings, Utils} defstruct [:stream, :manager, :settings, :batch] - @backend Settings.backend() - def start_link(stream, manager) do GenServer.start_link(__MODULE__, {stream, manager}, []) end @@ -74,13 +72,16 @@ defmodule Firehose.Emitter do {:noreply, %Emitter{state | batch: %Batch{stream: stream}}} end + # we need this to catch termination + def handle_info({:EXIT, _pid, _}, state), do: {:noreply, state} + def terminate(reason, %Emitter{} = state) do Logger.info( "[Firehose] [#{state.batch.stream}] Terminating due to #{inspect(reason)}. Pending batch #{debug(state)}" ) # Flush events - @backend.sync(state, Settings.retries()) + Settings.backend().sync(state, Settings.retries()) :ok end @@ -88,7 +89,7 @@ defmodule Firehose.Emitter do defp flush(%Emitter{} = state, tries) do Logger.info("[Firehose] [#{state.batch.stream}] Flushing pending batch #{debug(state)}") - spawn(fn -> @backend.sync(state, tries) end) + spawn(fn -> Settings.backend().sync(state, tries) end) end defp debug(state) do diff --git a/lib/firehose/settings.ex b/lib/firehose/settings.ex index 016f52f..e3ef029 100644 --- a/lib/firehose/settings.ex +++ b/lib/firehose/settings.ex @@ -2,30 +2,12 @@ defmodule Firehose.Settings do @default [flush_interval: 1_000, retries: 5, serializer: Poison, delimiter: "\n", debug: false] def debug, do: settings(:debug) || false - - def retries do - settings(:retries) || @default[:retries] - end - - def flush_interval do - settings(:flush_interval) || @default[:flush_interval] - end - - def handlers do - settings(:handlers) || @default[:handlers] || [] - end - - def delimiter do - settings(:delimiter) || @default[:delimiter] - end - - def serializer do - settings(:serializer) || @default[:serializer] - end - - def backend() do - settings(:backend) || Firehose.Backend.Aws - end + def retries, do: settings(:retries) || @default[:retries] + def flush_interval, do: settings(:flush_interval) || @default[:flush_interval] + def handlers, do: settings(:handlers) || @default[:handlers] || [] + def delimiter, do: settings(:delimiter) || @default[:delimiter] + def serializer, do: settings(:serializer) || @default[:serializer] + def backend, do: settings(:backend) || Firehose.Backend.Aws defp settings(key) do env = Application.get_env(:firehose, Firehose.Manager) || [] diff --git a/test/firehose_test.exs b/test/firehose_test.exs index e6d0398..a7e8912 100644 --- a/test/firehose_test.exs +++ b/test/firehose_test.exs @@ -1,8 +1,6 @@ defmodule FirehoseTest do use ExUnit.Case - import Mox - setup do {:ok, pid} = Supervisor.start_link(Firehose.Supervisor, []) @@ -14,4 +12,22 @@ defmodule FirehoseTest do assert [{"topic", _pid, _type, [Firehose.Emitter]}] = Supervisor.which_children(pid) end + + test "it respawn event emitter", %{pid: pid} do + Firehose.emit(pid, "topic1", "test1") + Firehose.emit(pid, "topic2", "test2") + + [ + {"topic2", child_pid1, _, [Firehose.Emitter]}, + {"topic1", _child_pid2, _, [Firehose.Emitter]} + ] = Supervisor.which_children(pid) + + # Killing emitter process + Process.exit(child_pid1, :normal) + + assert [ + {"topic2", _, _, [Firehose.Emitter]}, + {"topic1", _, _, [Firehose.Emitter]} + ] = Supervisor.which_children(pid) + end end diff --git a/test/test_helper.exs b/test/test_helper.exs index 3bb729d..ce1f8b1 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,4 +1,5 @@ ExUnit.start() -Mox.defmock(Firehose.Backend.Mock, for: Firehose.Backend) -Application.put_env(:firehose, Firehose.Manager, backend: Firehose.Backend.Mock) +# Mox.defmock(Firehose.Backend.Dummy, for: Firehose.Backend) + +Application.put_env(:firehose, Firehose.Manager, [backend: Firehose.Backend.Dummy])