Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce v2 #1

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ config :firehose, Firehose.Manager,
delimiter: "\n", # Add delimiter after each event or add nothing if `false` or `nil` set
```

Add `Firehose.Manager` to your supervisor and thats all. You can emit events to AWS Firehose:
Add `Firehose.Supervisor` to your supervisor tree and thats it. You can emit events to AWS Firehose like this:

```elixir
Firehose.emit("logs", %{"name" => "pageview", "ua" => "..."})
Expand Down
1 change: 1 addition & 0 deletions lib/firehose.ex
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
defmodule Firehose do
defdelegate emit(stream, data), to: Firehose.Manager
defdelegate emit(pid, stream, data), to: Firehose.Manager
end
4 changes: 4 additions & 0 deletions lib/firehose/backend.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
defmodule Firehose.Backend do
@callback sync(Firehose.Emitter.t(), integer()) ::
{:ok, :nothing | :synced} | {:error, :failure, String.t()}
end
84 changes: 84 additions & 0 deletions lib/firehose/backend/aws.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
defmodule Firehose.Backend.Aws do
@behaviour Firehose.Backend

require Logger

alias Firehose.{Batch, Emitter, Settings}

@impl Firehose.Backend
def sync(%Emitter{batch: %Batch{size: 0}}, _), do: {:ok, :nothing}
def sync(%Emitter{batch: %Batch{size: nil}}, _), do: {:ok, :nothing}

def sync(%Emitter{} = state, 0) do
Logger.error("[Firehose] [#{state.stream}] Cannot flush batch")

for handler <- Settings.handlers() do
apply(handler, :subscribe, [:failed, state.batch])
end
end

def sync(%Emitter{} = state, tries) do
%Batch{stream: stream, current_record: record, records: records} = state.batch

records = records || []
records = [record | records]

case send_records(state, stream, for(record <- records, do: %{data: record.data})) do
{:error, _, _} ->
Process.sleep(1_000)
sync(state, tries - 1)

{:ok, _} = result ->
result
end
end

defp send_records(state, stream, records) do
if Settings.debug() do
Logger.debug("[Firehose] [#{stream}] Send records #{inspect(records)}")
{:ok, :synced}
else
send_records_to_firehose(state, stream, records)
end
end

defp send_records_to_firehose(state, stream, records) do
result =
stream
|> ExAws.Firehose.put_record_batch(records)
|> ExAws.request()

Logger.debug("[Firehose] [#{stream}] AWS response: #{inspect(result)}")

case result do
{:ok, %{"FailedPutCount" => failed, "RequestResponses" => responses}} when failed > 0 ->
# We should resend failed records
failed_record =
for {record, index} <- Enum.with_index(responses),
Map.get(record, "ErrorCode") != nil do
Logger.debug("[Firehose] [#{stream}] failed record #{inspect(record)}")
Enum.at(records, index)
end

send_records(state, stream, failed_record)

{:ok, %{"FailedPutCount" => 0}} ->
Logger.info("[Firehose] [#{stream}] Batch was successfully flushed")

for handler <- Settings.handlers() do
apply(handler, :subscribe, [:completed, state.batch])
end

{:ok, :synced}

response ->
Logger.error("[Firehose] [#{stream}] Batch was not flushed due error. Retry later...")

for handler <- Settings.handlers() do
apply(handler, :subscribe, [:error, state.batch, response])
end

{:error, :failure, response}
end
end
end
7 changes: 7 additions & 0 deletions lib/firehose/backend/dummy.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
defmodule Firehose.Backend.Dummy do
@behaviour Firehose.Backend
require Logger

@impl Firehose.Backend
def sync(_, _), do: {:ok, :nothing}
end
156 changes: 15 additions & 141 deletions lib/firehose/emitter.ex
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
defmodule Firehose.Emitter do
use GenServer
require Logger
alias Firehose.{Emitter, Batch}

@settings Application.get_env(:firehose, Firehose.Manager) || []
@default [flush_interval: 1_000, retries: 5, serializer: Poison, delimiter: "\n", debug: false]

alias Firehose.{Emitter, Batch, Settings, Utils}
defstruct [:stream, :manager, :settings, :batch]

def start_link(stream, manager) do
Expand All @@ -15,27 +12,23 @@ defmodule Firehose.Emitter do
def init({stream, manager}) do
Process.flag(:trap_exit, true)

Logger.info(
"[Firehose] [#{stream}] Initialize emitter with settings: #{
inspect(Keyword.merge(@default, @settings))
}"
)
Logger.info("[Firehose] [#{stream}] Initialize emitter")

state = %Emitter{
stream: stream,
manager: manager,
batch: %Batch{stream: stream}
}

Process.send_after(self(), :sync, flush_interval())
Process.send_after(self(), :sync, Settings.flush_interval())

{:ok, state}
end

def emit(stream, data), do: emit(__MODULE__, stream, data)

def emit(pid, stream, data) when is_binary(stream) do
case serialize(data) do
case Utils.serialize(data) do
{:ok, data} ->
GenServer.call(pid, {:emit, stream, data})
{:ok, :emitted}
Expand All @@ -56,173 +49,54 @@ defmodule Firehose.Emitter do
{:reply, error, state}

{:error, reason} when reason in [:size_limit, :records_limit] ->
flush(state, retries())
flush(state, Settings.retries())
handle_call(args, from, %Emitter{state | batch: %Batch{stream: stream}})
end
end

def handle_info(:sync, %Emitter{batch: %Batch{size: 0}} = state) do
Process.send_after(self(), :sync, flush_interval())
Process.send_after(self(), :sync, Settings.flush_interval())
{:noreply, state}
end

def handle_info(:sync, %Emitter{batch: %Batch{size: nil}} = state) do
Process.send_after(self(), :sync, flush_interval())
Process.send_after(self(), :sync, Settings.flush_interval())
{:noreply, state}
end

def handle_info(:sync, %Emitter{stream: stream} = state) do
flush(state, state.settings[:retries])

Process.send_after(self(), :sync, flush_interval())
Process.send_after(self(), :sync, Settings.flush_interval())

{:noreply, %Emitter{state | batch: %Batch{stream: stream}}}
end

# we need this to catch termination
def handle_info({:EXIT, _pid, _}, state), do: {:noreply, state}

def terminate(reason, %Emitter{} = state) do
Logger.info(
"[Firehose] [#{state.batch.stream}] Terminating due to #{inspect(reason)}. Pending batch #{
debug(state)
}"
"[Firehose] [#{state.batch.stream}] Terminating due to #{inspect(reason)}. Pending batch #{debug(state)}"
)

# Flush events
send_sync(state, retries())
Settings.backend().sync(state, Settings.retries())

:ok
end

defp retries do
@settings[:retries] || @default[:retries]
end

defp flush_interval do
@settings[:flush_interval] || @default[:flush_interval]
end

defp handlers do
@settings[:handlers] || @default[:handlers] || []
end

defp delimiter do
@settings[:delimiter] || @default[:delimiter]
end

defp serializer do
@settings[:serializer] || @default[:serializer]
end

defp serialize(data) do
delimiter =
case delimiter() do
nil -> ""
delimiter -> delimiter
end

case serializer() do
nil ->
{:ok, data <> delimiter}

serializer ->
case serializer.encode(data) do
{:ok, data} ->
{:ok, data <> delimiter}

error ->
error
end
end
end

defp flush(%Emitter{} = state, tries) do
Logger.info("[Firehose] [#{state.batch.stream}] Flushing pending batch #{debug(state)}")

spawn(fn -> send_sync(state, tries) end)
end

defp send_sync(%Emitter{batch: %Batch{size: 0}}, _), do: :ok
defp send_sync(%Emitter{batch: %Batch{size: nil}}, _), do: :ok

defp send_sync(%Emitter{} = state, 0) do
Logger.error("[Firehose] [#{state.stream}] Cannot flush batch")

for handler <- handlers() do
apply(handler, :subscribe, [:failed, state.batch])
end
end

defp send_sync(%Emitter{} = state, tries) do
%Batch{stream: stream, current_record: record, records: records} = state.batch

records = records || []
records = [record | records]

case send_records(state, stream, for(record <- records, do: %{data: record.data})) do
{:error, _, _} ->
Process.sleep(1_000)
send_sync(state, tries - 1)

{:ok, _} = result ->
result
end
end

defp send_records(state, stream, records) do
if @settings[:debug] do
Logger.debug("[Firehose] [#{stream}] Send records #{inspect(records)}")
{:ok, :sended}
else
send_records_to_firehose(state, stream, records)
end
end

defp send_records_to_firehose(state, stream, records) do
result =
stream
|> ExAws.Firehose.put_record_batch(records)
|> ExAws.request()

Logger.debug("[Firehose] [#{stream}] AWS response: #{inspect(result)}")

case result do
{:ok, %{"FailedPutCount" => failed, "RequestResponses" => responses}} when failed > 0 ->
# We should resend failed records
failed_record =
for {record, index} <- Enum.with_index(responses),
Map.get(record, "ErrorCode") != nil do
Logger.debug("[Firehose] [#{stream}] failed record #{inspect(record)}")
Enum.at(records, index)
end

send_records(state, stream, failed_record)

{:ok, %{"FailedPutCount" => 0}} ->
Logger.info("[Firehose] [#{stream}] Batch was successfully flushed")

for handler <- handlers() do
apply(handler, :subscribe, [:completed, state.batch])
end

{:ok, :sended}

response ->
Logger.error("[Firehose] [#{stream}] Batch was not flushed due error. Retry later...")

for handler <- handlers() do
apply(handler, :subscribe, [:error, state.batch, response])
end

{:error, :failure, response}
end
spawn(fn -> Settings.backend().sync(state, tries) end)
end

defp debug(state) do
if state.batch.current_record == nil do
"(#{state.batch.records_size} records, #{state.batch.size} bytes)"
else
"(#{state.batch.records_size} records, #{state.batch.size} bytes, current record: #{
state.batch.current_record.size
} bytes)"
"(#{state.batch.records_size} records, #{state.batch.size} bytes, current record: #{state.batch.current_record.size} bytes)"
end
end
end
Loading