diff --git a/README.md b/README.md index b9b453a..8f5e9fd 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,8 @@ Usage: emqtt_bench conn [--help ] [-d ] [-h []] [-R []] [--force-major-gc-interval []] [--log_to []] + [--prometheus] + [--restapi] [--restapi-ip] [--restapi-port] --help help information -d, --dist enable distribution port @@ -98,7 +100,14 @@ Usage: emqtt_bench conn [--help ] [-d ] [-h []] --log_to Control where the log output goes. console: directly to the console null: quietly, don't output any logs. [default: console] - + --prometheus Enable metrics collection via Prometheus. + Usually used with --restapi to enable + scraping endpoint. + --restapi Enable REST API for monitoring and control. + For now only serves /metrics. + Can be set to IP:Port to listen on a specific IP and Port, + or just Port to listen on all interfaces on + that port. [default: disabled] ``` For example, create 50K concurrent connections at the arrival rate of 100/sec: @@ -118,6 +127,8 @@ Usage: emqtt_bench sub [--help ] [-d ] [-h []] [-p []] [ [--load-qst ] [--ifaddr ] [--prefix ] [-s []] [-l ] [--num-retry-connect []] [-R []] [--force-major-gc-interval []] [--log_to []] + [--prometheus] + [--restapi] [--restapi-ip] [--restapi-port] --help help information -d, --dist enable distribution port @@ -161,6 +172,14 @@ Usage: emqtt_bench sub [--help ] [-d ] [-h []] [-p []] [ 0] --log_to Control where the log output goes. console: directly to the console null: quietly, don't output any logs. [default: console] + --prometheus Enable metrics collection via Prometheus. + Usually used with --restapi to enable + scraping endpoint. + --restapi Enable REST API for monitoring and control. + For now only serves /metrics. + Can be set to IP:Port to listen on a specific IP and Port, + or just Port to listen on all interfaces on + that port. [default: disabled] ``` For example, create 50K concurrent connections at the arrival rate of 100/sec: @@ -181,6 +200,8 @@ Usage: emqtt_bench pub [--help ] [-d ] [-h []] [-p []] [ [-F []] [-w []] [--max-random-wait []] [--min-random-wait []] [--num-retry-connect []] [-R []] [--force-major-gc-interval []] [--log_to []] + [--prometheus] + [--restapi] [--restapi-ip] [--restapi-port] --help help information -d, --dist enable distribution port @@ -235,6 +256,14 @@ Usage: emqtt_bench pub [--help ] [-d ] [-h []] [-p []] [ [default: 0] --log_to Control where the log output goes. console: directly to the console null: quietly, don't output any logs. [default: console] + --prometheus Enable metrics collection via Prometheus. + Usually used with --restapi to enable + scraping endpoint. + --restapi Enable REST API for monitoring and control. + For now only serves /metrics. + Can be set to IP:Port to listen on a specific IP and Port, + or just Port to listen on all interfaces on + that port. [default: disabled] ``` For example, create 100 connections and each publishes messages at the rate of 100 msg/sec. diff --git a/rebar.config b/rebar.config index 8247cf2..104343e 100644 --- a/rebar.config +++ b/rebar.config @@ -16,11 +16,14 @@ {deps, [ {getopt, {git, "https://github.com/zmstone/getopt", {tag, "v1.0.2.1"}}}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.5"}}} + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.11.0"}}}, + {prometheus, {git, "https://github.com/emqx/prometheus.erl", {tag, "v4.10.0.2"}}}, + {cowboy, "2.9.0"} ]}. {escript_name, emqtt_bench}. {escript_main_app, emqtt_bench}. +{escript_incl_apps, [prometheus, quantile_estimator, cowboy]}. {escript_emu_args, "%%! -smp true +K true +A 16 +P 16000000 +Muacnl 0 +hms 64 -env ERL_MAX_PORTS 16000000 -env ERTS_MAX_PORTS 16000000\n"}. {escript_shebang, "#!/usr/bin/env escript\n"}. {provider_hooks, [{post, [{compile, escriptize}]}]}. diff --git a/rebar.config.script b/rebar.config.script index ca89c9e..2fa06a1 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -23,8 +23,8 @@ Profiles = {profiles,[ {escript, []} , stdlib , emqtt , getopt - , gun - , cowlib + , cowboy + , prometheus | [ quicer || IsQuicSupp ] ] } , {overlay_vars_values, [ {runner_root_dir, "$(cd $(dirname $(readlink $0 || echo $0))/..; pwd -P)"} diff --git a/src/emqtt_bench.app.src b/src/emqtt_bench.app.src index 62124a8..00e6222 100644 --- a/src/emqtt_bench.app.src +++ b/src/emqtt_bench.app.src @@ -3,8 +3,9 @@ {vsn, git}, {registered, []}, {applications, [kernel, stdlib, emqtt, getopt, gun, cowlib]}, + {included_applications, [cowboy, prometheus]}, {env, []}, {maintainers,["Feng Lee "]}, {licenses, ["Apache-2.0"]}, - {links, [{"Github", "https://github.com/emqx/emqtt"}]} + {links, [{"Github", "https://github.com/emqx/emqtt-bench"}]} ]}. diff --git a/src/emqtt_bench.erl b/src/emqtt_bench.erl index ebbca80..9f36b71 100644 --- a/src/emqtt_bench.erl +++ b/src/emqtt_bench.erl @@ -144,6 +144,14 @@ "Publisher's resend interval (in seconds) if the expected " "acknowledgement for a inflight packet is not " "received within this interval. Default value 0 means no resend." + }, + {prometheus, undefined, "prometheus", undefined, + "Enable metrics collection via Prometheus. Usually used with --restapi to enable scraping endpoint." + }, + {restapi, undefined, "restapi", {string, disabled}, + "Enable REST API for monitoring and control. For now only serves /metrics. " + "Can be set to IP:Port to listen on a specific IP and Port, or just Port " + "to listen on all interfaces on that port." } ]). @@ -214,6 +222,14 @@ "Control where the log output goes. " "console: directly to the console " "null: quietly, don't output any logs." + }, + {prometheus, undefined, "prometheus", undefined, + "Enable metrics collection via Prometheus. Usually used with --restapi to enable scraping endpoint." + }, + {restapi, undefined, "restapi", {string, disabled}, + "Enable REST API for monitoring and control. For now only serves /metrics. " + "Can be set to IP:Port to listen on a specific IP and Port, or just Port " + "to listen on all interfaces on that port." } ]). @@ -272,6 +288,14 @@ "Control where the log output goes. " "console: directly to the console " "null: quietly, don't output any logs." + }, + {prometheus, undefined, "prometheus", undefined, + "Enable metrics collection via Prometheus. Usually used with --restapi to enable scraping endpoint." + }, + {restapi, undefined, "restapi", {string, disabled}, + "Enable REST API for monitoring and control. For now only serves /metrics. " + "Can be set to IP:Port to listen on a specific IP and Port, or just Port " + "to listen on all interfaces on that port." } ]). @@ -358,6 +382,8 @@ main(conn, Opts) -> start(PubSub, Opts) -> ets:new(qoe_store, [named_table, public, ordered_set]), prepare(PubSub, Opts), init(), + maybe_init_prometheus(lists:member(prometheus, Opts)), + maybe_start_restapi(proplists:get_value(restapi, Opts)), IfAddr = proplists:get_value(ifaddr, Opts), Host = proplists:get_value(host, Opts), Rate = proplists:get_value(conn_rate, Opts), @@ -567,9 +593,13 @@ get_counter(CntName) -> [{CntName, Idx}] = ets:lookup(?cnt_map, CntName), counters:get(cnt_ref(), Idx). -inc_counter(CntName) -> - inc_counter(CntName, 1). -inc_counter(CntName, Inc) -> +inc_counter(Prometheus, CntName) -> + inc_counter(Prometheus, CntName, 1). +inc_counter(false, CntName, Inc) -> + [{CntName, Idx}] = ets:lookup(?cnt_map, CntName), + counters:add(cnt_ref(), Idx, Inc); +inc_counter(true, CntName, Inc) -> + prometheus_counter:inc(CntName, Inc), [{CntName, Idx}] = ets:lookup(?cnt_map, CntName), counters:add(cnt_ref(), Idx, Inc). @@ -606,6 +636,7 @@ run(Parent, I, N, PubSub, Opts0, AddrList, HostList) -> connect(Parent, N, PubSub, Opts) -> process_flag(trap_exit, true), rand:seed(exsplus, erlang:timestamp()), + Prometheus = proplists:get_value(prometheus, Opts, false), GoSignalPid = proplists:get_value(publish_signal_pid, Opts), SendGoSignal = proplists:get_value(send_go_signal, Opts), MRef = case is_pid(GoSignalPid) of @@ -643,7 +674,7 @@ connect(Parent, N, PubSub, Opts) -> ContinueFn = fun() -> loop(Parent, N, Client, PubSub, loop_opts(AllOpts)) end, case ConnRet of {ok, _Props} -> - inc_counter(connect_succ), + inc_counter(Prometheus, connect_succ), Res = case PubSub of conn -> ok; @@ -661,18 +692,18 @@ connect(Parent, N, PubSub, Opts) -> {error, _SubscribeError} -> maybe_retry(Parent, N, PubSub, Opts, ContinueFn); _ -> - PubSub =:= sub andalso inc_counter(sub), + PubSub =:= sub andalso inc_counter(Prometheus, sub), loop(Parent, N, Client, PubSub, loop_opts(AllOpts)) end; {error, {transport_down, Reason} = _QUICFail} when Reason == connection_idle; Reason == connection_refused; Reason == connection_timeout -> - inc_counter(Reason), + inc_counter(Prometheus, Reason), maybe_retry(Parent, N, PubSub, Opts, ContinueFn); {error, {transport_down, _Other = QUICFail}} -> io:format("Error: unknown QUIC transport_down ~p~n", [QUICFail]), - inc_counter(connect_fail), + inc_counter(Prometheus, connect_fail), {error, QUICFail}; {error, Error} -> io:format("client(~w): connect error - ~p~n", [N, Error]), @@ -680,21 +711,23 @@ connect(Parent, N, PubSub, Opts) -> end. maybe_retry(Parent, N, PubSub, Opts, ContinueFn) -> + Prometheus = proplists:get_value(prometheus, Opts, false), MaxRetries = proplists:get_value(num_retry_connect, Opts, 0), Retries = proplists:get_value(connection_attempts, Opts, 0), case Retries >= MaxRetries of true -> - inc_counter(connect_fail), - PubSub =:= sub andalso inc_counter(sub_fail), + inc_counter(Prometheus, connect_fail), + PubSub =:= sub andalso inc_counter(Prometheus, sub_fail), ContinueFn(); false -> - inc_counter(connect_retried), + inc_counter(Prometheus, connect_retried), NOpts = proplists:delete(connection_attempts, Opts), connect(Parent, N, PubSub, [{connection_attempts, Retries + 1} | NOpts]) end. loop(Parent, N, Client, PubSub, Opts) -> Interval = proplists:get_value(interval_of_msg, Opts, 0), + Prometheus = proplists:get_value(prometheus, Opts, false), Idle = max(Interval * 2, 500), MRef = proplists:get_value(publish_signal_mref, Opts), receive @@ -708,12 +741,12 @@ loop(Parent, N, Client, PubSub, Opts) -> %% this call hangs if emqtt inflight is full case publish(Client, Opts) of ok -> - inc_counter(pub), - ok = schedule_next_publish(Interval), + inc_counter(Prometheus, pub), + ok = schedule_next_publish(Prometheus, Interval), ok; {error, Reason} -> %% TODO: schedule next publish for retry ? - inc_counter(pub_fail), + inc_counter(Prometheus, pub_fail), io:format("client(~w): publish error - ~p~n", [N, Reason]) end, loop(Parent, N, Client, PubSub, Opts); @@ -722,8 +755,8 @@ loop(Parent, N, Client, PubSub, Opts) -> exit(normal) end; {publish, #{payload := Payload}} -> - inc_counter(recv), - maybe_check_payload_hdrs(Payload, proplists:get_value(payload_hdrs, Opts, [])), + inc_counter(Prometheus, recv), + maybe_check_payload_hdrs(Prometheus, Payload, proplists:get_value(payload_hdrs, Opts, [])), loop(Parent, N, Client, PubSub, Opts); {'EXIT', _Client, normal} -> ok; @@ -736,13 +769,13 @@ loop(Parent, N, Client, PubSub, Opts) -> io:format("client(~w): EXIT for ~p~n", [N, Reason]); {puback, _} -> %% Publish success for QoS 1 (recv puback) and 2 (recv pubcomp) - inc_counter(pub_succ), + inc_counter(Prometheus, pub_succ), loop(Parent, N, Client, PubSub, Opts); {disconnected, ReasonCode, _Meta} -> io:format("client(~w): disconnected with reason ~w: ~p~n", [N, ReasonCode, emqtt:reason_code_name(ReasonCode)]); {connected, _Props} -> - inc_counter(reconnect_succ), + inc_counter(Prometheus, reconnect_succ), IsSessionPresent = (1 == proplists:get_value(session_present, emqtt:info(Client))), %% @TODO here we do not really check the subscribe PubSub =:= sub andalso not IsSessionPresent andalso subscribe(Client, N, Opts), @@ -786,13 +819,13 @@ bump_publish_attempt_counter() -> _ = put(success_publish_count, NewCount), NewCount. -schedule_next_publish(Interval) -> +schedule_next_publish(Prometheus, Interval) -> PubAttempted = bump_publish_attempt_counter(), BeginTime = get_publish_begin_time(), NextTime = BeginTime + PubAttempted * Interval, NowT = erlang:monotonic_time(millisecond), Remain = NextTime - NowT, - Interval > 0 andalso Remain < 0 andalso inc_counter(pub_overrun), + Interval > 0 andalso Remain < 0 andalso inc_counter(Prometheus, pub_overrun), case Remain > 0 of true -> _ = erlang:send_after(Remain, self(), publish); false -> self() ! publish @@ -818,6 +851,7 @@ pub_limit_fun_init(N) when is_integer(N), N > 0 -> end. subscribe(Client, N, Opts) -> + Prometheus = proplists:get_value(prometheus, Opts, false), Qos = proplists:get_value(qos, Opts), Res = emqtt:subscribe(Client, [{Topic, Qos} || Topic <- topics_opt(Opts)]), case Res of @@ -833,6 +867,9 @@ subscribe(Client, N, Opts) -> ElapsedHandshake = HSTs - StartTs, ElapsedConn = ConnTs - StartTs, ElapsedSub = SubTs - StartTs, + histogram_observe(Prometheus, mqtt_client_handshake_duration, ElapsedHandshake), + histogram_observe(Prometheus, mqtt_client_connect_duration, ElapsedConn), + histogram_observe(Prometheus, mqtt_client_subscribe_duration, ElapsedSub), true = ets:insert(qoe_store, {proplists:get_value(client_id, Opts), {ElapsedHandshake, ElapsedConn, ElapsedSub} }), @@ -1081,7 +1118,8 @@ replace_opts(Opts, NewOpts) -> %% trim opts to save proc stack mem. loop_opts(Opts) -> - lists:filter(fun({K,__V}) -> + lists:filter(fun + ({K,__V}) -> lists:member(K, [ interval_of_msg , payload , payload_size @@ -1096,7 +1134,8 @@ loop_opts(Opts) -> , seq , publish_signal_mref , pub_start_wait - ]) + ]); + (K) -> K =:= prometheus end, Opts). -spec maybe_start_quicer() -> boolean(). @@ -1231,22 +1270,71 @@ counters() -> Idxs = lists:seq(2, length(Names) + 1), lists:zip(Names, Idxs). +maybe_init_prometheus(false) -> + ok; +maybe_init_prometheus(true) -> + Collectors = [ prometheus_boolean, + prometheus_counter, + prometheus_gauge, + prometheus_histogram, + prometheus_quantile_summary, + prometheus_summary], + application:set_env(prometheus, collectors, Collectors), + {ok, _} = application:ensure_all_started(prometheus), + Counters = [ publish_latency + , recv + , sub + , sub_fail + , pub + , pub_fail + , pub_overrun + , pub_succ + , connect_succ + , connect_fail + , connect_retried + , reconnect_succ + , unreachable + , connection_refused + , connection_timeout + , connection_idle + ], + lists:foreach( + fun(Cnt) -> + prometheus_counter:declare([{name, Cnt}, {help, atom_to_list(Cnt)}]) + end, Counters), + prometheus_histogram:declare([{name, mqtt_client_handshake_duration}, + {labels, []}, + {buckets, [1, 3, 5, 10, 20]}, + {help, "Handshake duration of MQTT client"}]), + prometheus_histogram:declare([{name, mqtt_client_connect_duration}, + {labels, []}, + {buckets, [1, 10, 25, 50, 100]}, + {help, "Connect duration of MQTT client"}]), + prometheus_histogram:declare([{name, mqtt_client_subscribe_duration}, + {labels, []}, + {buckets, [1, 10, 25, 50, 100]}, + {help, "Subscribe duration of MQTT client"}]), + prometheus_histogram:declare([{name, e2e_latency}, + {buckets, [1, 5, 10, 25, 50, 100, 500, 1000]}, + {help, "End-to-end latency"}]). + %% @doc Check received payload headers --spec maybe_check_payload_hdrs(Payload :: binary(), Hdrs :: [string()]) -> ok. -maybe_check_payload_hdrs({template, _Bin}, _) -> +-spec maybe_check_payload_hdrs(Prometheus :: boolean(), Payload :: binary(), Hdrs :: [string()]) -> ok. +maybe_check_payload_hdrs(_, {template, _Bin}, _) -> ok; -maybe_check_payload_hdrs(_Bin, []) -> +maybe_check_payload_hdrs(_, _Bin, []) -> ok; -maybe_check_payload_hdrs(<< TS:64/integer, BinL/binary >>, [?hdr_ts | RL]) -> +maybe_check_payload_hdrs(Prometheus, << TS:64/integer, BinL/binary >>, [?hdr_ts | RL]) -> E2ELatency = os:system_time(millisecond) - TS, - E2ELatency > 0 andalso inc_counter(publish_latency, E2ELatency), - maybe_check_payload_hdrs(BinL, RL); -maybe_check_payload_hdrs(<< Cnt:64/integer, BinL/binary >>, [?hdr_cnt64 | RL]) -> + E2ELatency > 0 andalso inc_counter(Prometheus, publish_latency, E2ELatency), + E2ELatency > 0 andalso histogram_observe(Prometheus, e2e_latency, E2ELatency), + maybe_check_payload_hdrs(Prometheus, BinL, RL); +maybe_check_payload_hdrs(Prometheus, << Cnt:64/integer, BinL/binary >>, [?hdr_cnt64 | RL]) -> case put(payload_hdr_cnt64, Cnt) of undefined -> ok; Old when Cnt - 1 == Old -> - maybe_check_payload_hdrs(BinL, RL); + maybe_check_payload_hdrs(Prometheus, BinL, RL); Old -> throw({err_payload_hdr_cnt64, Old, Cnt}) end. @@ -1288,3 +1376,36 @@ validate_payload_hdrs([Hdr | T]) -> false -> error({unsupp_payload_hdr, Hdr}) end. + +maybe_start_restapi(disabled) -> + ok; +maybe_start_restapi("disabled") -> + ok; +maybe_start_restapi(RestAPI) -> + {IP, Port} = + case string:split(RestAPI, ":") of + [IP0, Port0] -> + {Port1, _} = string:to_integer(Port0), + {ok, IP1} = inet:parse_ipv4_address(IP0), + {IP1, Port1}; + [Port0] -> + {Port1, _} = string:to_integer(Port0), + {{0, 0, 0, 0}, Port1} + end, + TransportOpts = #{ip => IP, port => Port}, + Env = #{dispatch => dispatch()}, + ProtocolOpts = #{env => Env}, + application:ensure_all_started(cowboy), + cowboy:start_clear(rest_api, maps:to_list(TransportOpts), ProtocolOpts). + +dispatch() -> + cowboy_router:compile([{'_', routes()}]). + +routes() -> + [ {"/metrics", emqtt_bench_http_metrics, []} + ]. + +histogram_observe(false, _, _) -> + ok; +histogram_observe(true, Metric, Value) -> + prometheus_histogram:observe(Metric, Value). diff --git a/src/http/emqtt_bench_http_metrics.erl b/src/http/emqtt_bench_http_metrics.erl new file mode 100644 index 0000000..e40431d --- /dev/null +++ b/src/http/emqtt_bench_http_metrics.erl @@ -0,0 +1,38 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqtt_bench_http_metrics). + +-export([ init/2 + , allowed_methods/2 + , content_types_provided/2 + , text/2 + ]). + +init(Req, State) -> + {cowboy_rest, Req, State}. + +allowed_methods(Req, State) -> + {[<<"GET">>], Req, State}. + +content_types_provided(Req, State) -> + { [{<<"text/plain">>, text}] + , Req + , State + }. + +text(Req, State) -> + Body = prometheus_text_format:format(), + {Body, Req, State}.