Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UDP packet size limit #52

Merged
merged 1 commit into from
May 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ This reporter pushes data to [InfluxDB](https://influxdb.com/index.html).

Available options:

* __protocol__ - `http`, `https` or `udp` for operating with InfluxDB. `http` by default. If you use `udp`, check __udp_mtu__ below to avoid `{error,emsgsize}`.
* __host__ - InfluxDB host. `127.0.0.1` by default.
* __protocol__ - `http`, `https` or `udp` for operating with InfluxDB. `http` by default.
* __port__ - InfluxDB port. `8086` by default.
* __db__ - Database on InfluxDB for writing data. `exometer` by default.
* __username__ - Username for authorization on InfluxDB.
* __password__ - Password for authorization on InfluxDB.
* __timestamping__ - Enable timestamping, `false` by default. To enable `timestamping` with the reporter you can use `true` or `{true, Precision}` where `Precision` is a unit taken from `[n,u,ms,s,m,h]`. The default unit is `u`.
* __batch_window_size__ - set window size in ms for batch sending. This means the reporter will collect measurements within this interval and send all measurements in one packet. `0` by default.
* __batch_window_size__ - Set window size in ms for batch sending. This means the reporter will collect measurements within this interval and send all measurements in one packet. `0` by default.
* __udp_mtu__ - (Used only with __protocol__ == `udp`.) MTU of the network interface through which UDP packets flow to the __host__. `65536` by default (Linux loopback interface MTU). Run `ifconfig` on the machine where this app will run to find it out. Metrics will be sent out if their size (in the Line Protocol format) exceeds this value, even if the current __batch_window_size__ is not over yet. (They will also be sent out at the end of __batch_window_size__ as usual, regardless of their size.)

The following options can be set globally in the reporter config or locally in a specific subscription. The latter case overwrites the first.

Expand Down
2 changes: 2 additions & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@
]}
]}
]}.

{cover_enabled, true}.
105 changes: 47 additions & 58 deletions src/exometer_report_influxdb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

-ifdef(TEST).
-export([evaluate_subscription_options/5,
make_packet/5]).
make_packet/5,
maybe_send/4,
name/1]).
-endif.

-include_lib("exometer_core/include/exometer.hrl").
Expand All @@ -33,40 +35,25 @@
-define(DEFAULT_FORMATTING, []).
-define(DEFAULT_TIMESTAMP_OPT, false).
-define(DEFAULT_BATCH_WINDOW_SIZE, 0).
-define(DEFAULT_UDP_MTU, 65536).
-define(DEFAULT_AUTOSUBSCRIBE, false).
-define(DEFAULT_SUBSCRIPTIONS_MOD, undefined).

-define(VALID_PRECISIONS, [n, u, ms, s, m, h]).

% https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
-define(MAX_UDP_PACKET_SIZE, 65535).
-define(UDP_HEADER_SIZE, 8).
-define(IP_HEADER_SIZE, 20).

-define(HTTP(Proto), (Proto =:= http orelse Proto =:= https)).

-include("state.hrl").
-include("log.hrl").

-type options() :: [{atom(), any()}].
-type value() :: any().
-type callback_result() :: {ok, state()} | any().
-type precision() :: n | u | ms | s | m | h.
-type protocol() :: http | udp.

-record(state, {protocol :: protocol(),
db :: binary(), % for http
username :: undefined | binary(), % for http
password :: undefined | binary(), % for http
host :: inet:ip_address() | inet:hostname(), % for udp
port :: inet:port_number(), % for udp
timestamping :: boolean(),
precision :: precision(),
collected_metrics = #{} :: map(),
batch_window_size = 0 :: integer(),
tags :: map(),
series_name :: atom() | binary(),
formatting :: list(),
metrics :: map(),
autosubscribe :: boolean(),
subscriptions_module :: module(),
connection :: gen_udp:socket() | reference()}).
-type state() :: #state{}.


%% ===================================================================
%% Public API
Expand All @@ -81,6 +68,7 @@ exometer_init(Opts) ->
Password = get_opt(password, Opts, ?DEFAULT_PASSWORD),
TimestampOpt = get_opt(timestamping, Opts, ?DEFAULT_TIMESTAMP_OPT),
BatchWinSize = get_opt(batch_window_size, Opts, ?DEFAULT_BATCH_WINDOW_SIZE),
UDP_MTU = get_opt(udp_mtu, Opts, ?DEFAULT_UDP_MTU),
{Timestamping, Precision} = evaluate_timestamp_opt(TimestampOpt),
Tags = [{key(Key), Value} || {Key, Value} <- get_opt(tags, Opts, [])],
SeriesName = get_opt(series_name, Opts, ?DEFAULT_SERIES_NAME),
Expand All @@ -100,6 +88,7 @@ exometer_init(Opts) ->
series_name = SeriesName,
formatting = Formatting,
batch_window_size = BatchWinSize,
max_udp_size = max_udp_size(UDP_MTU),
autosubscribe = Autosubscribe,
subscriptions_module = SubscriptionsMod,
metrics = maps:new()},
Expand Down Expand Up @@ -128,7 +117,7 @@ exometer_report(Metric, DataPoint, _Extra, Value,
#state{metrics = Metrics} = State) ->
case maps:get(Metric, Metrics, not_found) of
{MetricName, Tags} ->
maybe_send(Metric, MetricName, Tags,
maybe_send(MetricName, Tags,
maps:from_list([{DataPoint, Value}]), State);
Error ->
?warning("InfluxDB reporter got trouble when looking ~p metric's tag: ~p",
Expand Down Expand Up @@ -175,13 +164,9 @@ exometer_cast(_Unknown, State) ->
exometer_info({exometer_influxdb, reconnect}, State) ->
reconnect(State);
exometer_info({exometer_influxdb, send},
#state{precision = Precision,
collected_metrics = CollectedMetrics} = State) ->
if CollectedMetrics /= #{} ->
Packets = [make_packet(MetricName, Tags, Fileds, Timestamping, Precision) ++ "\n"
|| {_, {MetricName, Tags, Fileds, Timestamping}}
<- maps:to_list(CollectedMetrics)],
send(Packets, State#state{collected_metrics = #{}});
#state{collected_metrics = CollectedMetrics} = State) ->
if size(CollectedMetrics) > 0 ->
send(CollectedMetrics, State#state{collected_metrics = <<>>});
true -> {ok, State}
end;
exometer_info(_Unknown, State) ->
Expand Down Expand Up @@ -272,36 +257,35 @@ prepare_batch_send(Time) ->
prepare_reconnect() ->
erlang:send_after(1000, self(), {exometer_influxdb, reconnect}).

-spec maybe_send(list(), list(), map(), map(), state()) ->
-spec maybe_send(list(), map(), map(), state()) ->
{ok, state()} | {error, term()}.
maybe_send(OriginMetricName, MetricName, Tags0, Fields,
#state{batch_window_size = BatchWinSize,
maybe_send(MetricName, Tags, Fields,
#state{batch_window_size = 0,
precision = Precision,
timestamping = Timestamping} = State) ->
Packet = make_packet(MetricName, Tags, Fields, Timestamping andalso unix_time(Precision), Precision),
send(Packet, State);
maybe_send(MetricName, Tags, Fields,
#state{protocol = Protocol,
batch_window_size = BatchWindowSize,
max_udp_size = MaxUDPSize,
precision = Precision,
timestamping = Timestamping,
collected_metrics = CollectedMetrics} = State)
when BatchWinSize > 0 ->
NewCollectedMetrics = case maps:get(OriginMetricName, CollectedMetrics, not_found) of
{MetricName, Tags, Fields1} ->
NewFields = maps:merge(Fields, Fields1),
maps:put(OriginMetricName,
{MetricName, Tags, NewFields, Timestamping andalso unix_time(Precision)},
CollectedMetrics);
{MetricName, Tags, Fields1, _OrigTimestamp} ->
NewFields = maps:merge(Fields, Fields1),
maps:put(OriginMetricName,
{MetricName, Tags, NewFields, Timestamping andalso unix_time(Precision)},
CollectedMetrics);
not_found ->
maps:put(OriginMetricName,
{MetricName, Tags0, Fields, Timestamping andalso unix_time(Precision)},
CollectedMetrics)
end,
maps:size(CollectedMetrics) == 0 andalso prepare_batch_send(BatchWinSize),
{ok, State#state{collected_metrics = NewCollectedMetrics}};
maybe_send(_, MetricName, Tags, Fields,
#state{timestamping = Timestamping, precision = Precision} = State) ->
Packet = make_packet(MetricName, Tags, Fields, Timestamping, Precision),
send(Packet, State).
collected_metrics = CollectedMetrics} = State) ->
maybe_start_new_window(BatchWindowSize, CollectedMetrics),
Packet = make_packet(MetricName, Tags, Fields, Timestamping andalso unix_time(Precision), Precision),
BinaryPacket = list_to_binary(Packet),
NewCollectedMetrics = <<CollectedMetrics/binary, BinaryPacket/binary, "\n">>,
if
Protocol == udp andalso size(CollectedMetrics) > 0 andalso size(NewCollectedMetrics) > MaxUDPSize ->
send(CollectedMetrics, State#state{collected_metrics = <<BinaryPacket/binary, "\n">>});
true ->
{ok, State#state{collected_metrics = NewCollectedMetrics}}
end.

maybe_start_new_window(Window, Metrics) when size(Metrics) == 0 ->
prepare_batch_send(Window);
maybe_start_new_window(_, _) -> ok.

-spec send(binary() | list(), state()) ->
{ok, state()} | {error, term()}.
Expand Down Expand Up @@ -338,6 +322,11 @@ send(Packet, #state{protocol = udp, connection = Socket,
end;
send(_, #state{protocol = Protocol}) -> {error, {Protocol, not_supported}}.

max_udp_size(MTU) when MTU > ?MAX_UDP_PACKET_SIZE ->
max_udp_size(?MAX_UDP_PACKET_SIZE);
max_udp_size(MaxPacketSize) ->
MaxPacketSize - ?UDP_HEADER_SIZE - ?IP_HEADER_SIZE.

-spec merge_tags(list() | map(), list() | map()) -> map().
merge_tags(Tags, AdditionalTags) when is_list(Tags) ->
merge_tags(maps:from_list(Tags), AdditionalTags);
Expand Down
23 changes: 23 additions & 0 deletions src/state.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-type precision() :: n | u | ms | s | m | h.
-type protocol() :: http | udp.

-record(state, {protocol :: protocol(),
db :: binary(), % for http
username :: undefined | binary(), % for http
password :: undefined | binary(), % for http
host :: inet:ip_address() | inet:hostname(), % for udp
port :: inet:port_number(), % for udp
timestamping :: boolean(),
precision :: precision(),
collected_metrics = <<>> :: binary(),
batch_window_size :: non_neg_integer(),
max_udp_size :: pos_integer(),
tags :: map(),
series_name :: atom() | binary(),
formatting :: list(),
metrics :: map(),
autosubscribe :: boolean(),
subscriptions_module :: module(),
connection :: gen_udp:socket() | reference()}).

-type state() :: #state{}.
46 changes: 45 additions & 1 deletion test/exometer_influxdb_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("exometer_core/include/exometer.hrl").

-include("state.hrl").

-import(exometer_report_influxdb, [evaluate_subscription_options/5,
make_packet/5]).
exometer_info/2,
make_packet/5,
maybe_send/4,
name/1]).


evaluate_subscription_options(MetricId, Options) ->
Expand Down Expand Up @@ -145,5 +150,44 @@ subscribtions_module_test() ->
gen_udp:close(Socket),
ok.

send_udp_packet_when_it_hits_size_limit_test() ->
TestServerPort = 55555,
{ok, ServerSocket} = gen_udp:open(TestServerPort, [{active, false}]),
{ok, ClientSocket} = gen_udp:open(0),
State1 = #state{protocol = udp,
connection = ClientSocket,
host = "localhost",
port = TestServerPort,
batch_window_size = 99999,
max_udp_size = 70,
precision = s,
timestamping = false},
{ok, State2} = maybe_send(flights, [{type, departure}], #{count => 3}, State1), % fits into packet - send
{ok, State3} = maybe_send(flights, [{type, arrival}], #{count => 2}, State2), % fits into packet - send
{ok, State4} = maybe_send(flights, [{type, arrival}], #{count => 1}, State3), % doesn't fit - save for later
{ok, State5} = maybe_send(flights, [{type, arrival}], #{count => 1}, State4), % doesn't fit - save for later
{ok, {_Address, _Port, Packet1}} = gen_udp:recv(ServerSocket, 0, 9999),

?assertEqual("flights,type=departure count=3i \n"
"flights,type=arrival count=2i \n", Packet1),

?assertEqual(<<"flights,type=arrival count=1i \n"
"flights,type=arrival count=1i \n">>, State5#state.collected_metrics),

% Send the remaining metrics when the current batch window ends
{ok, State6} = exometer_info({exometer_influxdb, send}, State5),
{ok, {_Address, _Port, Packet2}} = gen_udp:recv(ServerSocket, 0, 9999),

?assertEqual("flights,type=arrival count=1i \n"
"flights,type=arrival count=1i \n", Packet2),

?assertEqual(<<>>, State6#state.collected_metrics).

name_test() ->
?assertEqual(<<"things">>, name(things)),
?assertEqual(<<"things">>, name(<<"things">>)),
?assertEqual(<<"foo_bar">>, name([foo, bar])),
?assertEqual(<<"97_98_99">>, name("abc")).

make_bin_packet(Name, Tags, Fields, Timestamping, Precision) ->
binary:list_to_bin(make_packet(Name, Tags, Fields, Timestamping, Precision)).