From f485ea4b31e33b296e6f4fbc938a2181fada6313 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Thu, 5 Sep 2024 11:27:56 +0200 Subject: [PATCH] feat: instrument with prometheus --- README.md | 28 ++++++- rebar.config | 5 +- rebar.config.script | 4 +- src/emqtt_bench.app.src | 3 +- src/emqtt_bench.erl | 180 ++++++++++++++++++++++++++++++++++------ 5 files changed, 188 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index b9b453a..2f76e7c 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,8 @@ Usage: emqtt_bench conn [--help ] [-d ] [-h []] [-R []] [--force-major-gc-interval []] [--log_to []] + [--prometheus] + [--restapi] [--restapi-ip] [--restapi-port] --help help information -d, --dist enable distribution port @@ -98,7 +100,13 @@ Usage: emqtt_bench conn [--help ] [-d ] [-h []] --log_to Control where the log output goes. console: directly to the console null: quietly, don't output any logs. [default: console] - + --prometheus Enable metrics collection via Prometheus. + Usually used with --restapi to enable + scraping endpoint. [default: false] + --restapi Enable REST API for monitoring and control. + For now only serves /metrics. [default: false] + --restapi-ip REST API listen IP address [default: 0.0.0.0] + --restapi-port REST API listen port [default: 9090] ``` For example, create 50K concurrent connections at the arrival rate of 100/sec: @@ -118,6 +126,8 @@ Usage: emqtt_bench sub [--help ] [-d ] [-h []] [-p []] [ [--load-qst ] [--ifaddr ] [--prefix ] [-s []] [-l ] [--num-retry-connect []] [-R []] [--force-major-gc-interval []] [--log_to []] + [--prometheus] + [--restapi] [--restapi-ip] [--restapi-port] --help help information -d, --dist enable distribution port @@ -161,6 +171,13 @@ Usage: emqtt_bench sub [--help ] [-d ] [-h []] [-p []] [ 0] --log_to Control where the log output goes. console: directly to the console null: quietly, don't output any logs. [default: console] + --prometheus Enable metrics collection via Prometheus. + Usually used with --restapi to enable + scraping endpoint. [default: false] + --restapi Enable REST API for monitoring and control. + For now only serves /metrics. [default: false] + --restapi-ip REST API listen IP address [default: 0.0.0.0] + --restapi-port REST API listen port [default: 9090] ``` For example, create 50K concurrent connections at the arrival rate of 100/sec: @@ -181,6 +198,8 @@ Usage: emqtt_bench pub [--help ] [-d ] [-h []] [-p []] [ [-F []] [-w []] [--max-random-wait []] [--min-random-wait []] [--num-retry-connect []] [-R []] [--force-major-gc-interval []] [--log_to []] + [--prometheus] + [--restapi] [--restapi-ip] [--restapi-port] --help help information -d, --dist enable distribution port @@ -235,6 +254,13 @@ Usage: emqtt_bench pub [--help ] [-d ] [-h []] [-p []] [ [default: 0] --log_to Control where the log output goes. console: directly to the console null: quietly, don't output any logs. [default: console] + --prometheus Enable metrics collection via Prometheus. + Usually used with --restapi to enable + scraping endpoint. [default: false] + --restapi Enable REST API for monitoring and control. + For now only serves /metrics. [default: false] + --restapi-ip REST API listen IP address [default: 0.0.0.0] + --restapi-port REST API listen port [default: 9090] ``` For example, create 100 connections and each publishes messages at the rate of 100 msg/sec. diff --git a/rebar.config b/rebar.config index 8247cf2..104343e 100644 --- a/rebar.config +++ b/rebar.config @@ -16,11 +16,14 @@ {deps, [ {getopt, {git, "https://github.com/zmstone/getopt", {tag, "v1.0.2.1"}}}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.5"}}} + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.11.0"}}}, + {prometheus, {git, "https://github.com/emqx/prometheus.erl", {tag, "v4.10.0.2"}}}, + {cowboy, "2.9.0"} ]}. {escript_name, emqtt_bench}. {escript_main_app, emqtt_bench}. +{escript_incl_apps, [prometheus, quantile_estimator, cowboy]}. {escript_emu_args, "%%! -smp true +K true +A 16 +P 16000000 +Muacnl 0 +hms 64 -env ERL_MAX_PORTS 16000000 -env ERTS_MAX_PORTS 16000000\n"}. {escript_shebang, "#!/usr/bin/env escript\n"}. {provider_hooks, [{post, [{compile, escriptize}]}]}. diff --git a/rebar.config.script b/rebar.config.script index ca89c9e..2fa06a1 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -23,8 +23,8 @@ Profiles = {profiles,[ {escript, []} , stdlib , emqtt , getopt - , gun - , cowlib + , cowboy + , prometheus | [ quicer || IsQuicSupp ] ] } , {overlay_vars_values, [ {runner_root_dir, "$(cd $(dirname $(readlink $0 || echo $0))/..; pwd -P)"} diff --git a/src/emqtt_bench.app.src b/src/emqtt_bench.app.src index 62124a8..00e6222 100644 --- a/src/emqtt_bench.app.src +++ b/src/emqtt_bench.app.src @@ -3,8 +3,9 @@ {vsn, git}, {registered, []}, {applications, [kernel, stdlib, emqtt, getopt, gun, cowlib]}, + {included_applications, [cowboy, prometheus]}, {env, []}, {maintainers,["Feng Lee "]}, {licenses, ["Apache-2.0"]}, - {links, [{"Github", "https://github.com/emqx/emqtt"}]} + {links, [{"Github", "https://github.com/emqx/emqtt-bench"}]} ]}. diff --git a/src/emqtt_bench.erl b/src/emqtt_bench.erl index ebbca80..d9f4b4d 100644 --- a/src/emqtt_bench.erl +++ b/src/emqtt_bench.erl @@ -144,6 +144,18 @@ "Publisher's resend interval (in seconds) if the expected " "acknowledgement for a inflight packet is not " "received within this interval. Default value 0 means no resend." + }, + {prometheus, undefined, "prometheus", {boolean, false}, + "Enable metrics collection via Prometheus. Usually used with --restapi to enable scraping endpoint." + }, + {restapi, undefined, "restapi", {boolean, false}, + "Enable REST API for monitoring and control. For now only serves /metrics." + }, + {restapi_ip, undefined, "restapi-ip", {string, "0.0.0.0"}, + "REST API listen IP address" + }, + {restapi_port, undefined, "restapi-port", {integer, 9090}, + "REST API listen port" } ]). @@ -214,6 +226,18 @@ "Control where the log output goes. " "console: directly to the console " "null: quietly, don't output any logs." + }, + {prometheus, undefined, "prometheus", {boolean, false}, + "Enable metrics collection via Prometheus. Usually used with --restapi to enable scraping endpoint." + }, + {restapi, undefined, "restapi", {boolean, false}, + "Enable REST API for monitoring and control. For now only serves /metrics." + }, + {restapi_ip, undefined, "restapi-ip", {string, "0.0.0.0"}, + "REST API listen IP address" + }, + {restapi_port, undefined, "restapi-port", {integer, 9090}, + "REST API listen port" } ]). @@ -272,6 +296,18 @@ "Control where the log output goes. " "console: directly to the console " "null: quietly, don't output any logs." + }, + {prometheus, undefined, "prometheus", {boolean, false}, + "Enable metrics collection via Prometheus. Usually used with --restapi to enable scraping endpoint." + }, + {restapi, undefined, "restapi", {boolean, false}, + "Enable REST API for monitoring and control. For now only serves /metrics." + }, + {restapi_ip, undefined, "restapi-ip", {string, "0.0.0.0"}, + "REST API listen IP address" + }, + {restapi_port, undefined, "restapi-port", {integer, 9090}, + "REST API listen port" } ]). @@ -358,6 +394,8 @@ main(conn, Opts) -> start(PubSub, Opts) -> ets:new(qoe_store, [named_table, public, ordered_set]), prepare(PubSub, Opts), init(), + maybe_init_prometheus(proplists:get_value(prometheus, Opts, false)), + maybe_start_restapi(proplists:get_value(restapi, Opts, false), Opts), IfAddr = proplists:get_value(ifaddr, Opts), Host = proplists:get_value(host, Opts), Rate = proplists:get_value(conn_rate, Opts), @@ -567,9 +605,13 @@ get_counter(CntName) -> [{CntName, Idx}] = ets:lookup(?cnt_map, CntName), counters:get(cnt_ref(), Idx). -inc_counter(CntName) -> - inc_counter(CntName, 1). -inc_counter(CntName, Inc) -> +inc_counter(Prometheus, CntName) -> + inc_counter(Prometheus, CntName, 1). +inc_counter(false, CntName, Inc) -> + [{CntName, Idx}] = ets:lookup(?cnt_map, CntName), + counters:add(cnt_ref(), Idx, Inc); +inc_counter(true, CntName, Inc) -> + prometheus_counter:inc(CntName, Inc), [{CntName, Idx}] = ets:lookup(?cnt_map, CntName), counters:add(cnt_ref(), Idx, Inc). @@ -606,6 +648,7 @@ run(Parent, I, N, PubSub, Opts0, AddrList, HostList) -> connect(Parent, N, PubSub, Opts) -> process_flag(trap_exit, true), rand:seed(exsplus, erlang:timestamp()), + Prometheus = proplists:get_value(prometheus, Opts, false), GoSignalPid = proplists:get_value(publish_signal_pid, Opts), SendGoSignal = proplists:get_value(send_go_signal, Opts), MRef = case is_pid(GoSignalPid) of @@ -643,7 +686,7 @@ connect(Parent, N, PubSub, Opts) -> ContinueFn = fun() -> loop(Parent, N, Client, PubSub, loop_opts(AllOpts)) end, case ConnRet of {ok, _Props} -> - inc_counter(connect_succ), + inc_counter(Prometheus, connect_succ), Res = case PubSub of conn -> ok; @@ -661,18 +704,18 @@ connect(Parent, N, PubSub, Opts) -> {error, _SubscribeError} -> maybe_retry(Parent, N, PubSub, Opts, ContinueFn); _ -> - PubSub =:= sub andalso inc_counter(sub), + PubSub =:= sub andalso inc_counter(Prometheus, sub), loop(Parent, N, Client, PubSub, loop_opts(AllOpts)) end; {error, {transport_down, Reason} = _QUICFail} when Reason == connection_idle; Reason == connection_refused; Reason == connection_timeout -> - inc_counter(Reason), + inc_counter(Prometheus, Reason), maybe_retry(Parent, N, PubSub, Opts, ContinueFn); {error, {transport_down, _Other = QUICFail}} -> io:format("Error: unknown QUIC transport_down ~p~n", [QUICFail]), - inc_counter(connect_fail), + inc_counter(Prometheus, connect_fail), {error, QUICFail}; {error, Error} -> io:format("client(~w): connect error - ~p~n", [N, Error]), @@ -680,21 +723,23 @@ connect(Parent, N, PubSub, Opts) -> end. maybe_retry(Parent, N, PubSub, Opts, ContinueFn) -> + Prometheus = proplists:get_value(prometheus, Opts, false), MaxRetries = proplists:get_value(num_retry_connect, Opts, 0), Retries = proplists:get_value(connection_attempts, Opts, 0), case Retries >= MaxRetries of true -> - inc_counter(connect_fail), - PubSub =:= sub andalso inc_counter(sub_fail), + inc_counter(Prometheus, connect_fail), + PubSub =:= sub andalso inc_counter(Prometheus, sub_fail), ContinueFn(); false -> - inc_counter(connect_retried), + inc_counter(Prometheus, connect_retried), NOpts = proplists:delete(connection_attempts, Opts), connect(Parent, N, PubSub, [{connection_attempts, Retries + 1} | NOpts]) end. loop(Parent, N, Client, PubSub, Opts) -> Interval = proplists:get_value(interval_of_msg, Opts, 0), + Prometheus = proplists:get_value(prometheus, Opts, false), Idle = max(Interval * 2, 500), MRef = proplists:get_value(publish_signal_mref, Opts), receive @@ -708,12 +753,12 @@ loop(Parent, N, Client, PubSub, Opts) -> %% this call hangs if emqtt inflight is full case publish(Client, Opts) of ok -> - inc_counter(pub), - ok = schedule_next_publish(Interval), + inc_counter(Prometheus, pub), + ok = schedule_next_publish(Prometheus, Interval), ok; {error, Reason} -> %% TODO: schedule next publish for retry ? - inc_counter(pub_fail), + inc_counter(Prometheus, pub_fail), io:format("client(~w): publish error - ~p~n", [N, Reason]) end, loop(Parent, N, Client, PubSub, Opts); @@ -722,8 +767,8 @@ loop(Parent, N, Client, PubSub, Opts) -> exit(normal) end; {publish, #{payload := Payload}} -> - inc_counter(recv), - maybe_check_payload_hdrs(Payload, proplists:get_value(payload_hdrs, Opts, [])), + inc_counter(Prometheus, recv), + maybe_check_payload_hdrs(Prometheus, Payload, proplists:get_value(payload_hdrs, Opts, [])), loop(Parent, N, Client, PubSub, Opts); {'EXIT', _Client, normal} -> ok; @@ -736,13 +781,13 @@ loop(Parent, N, Client, PubSub, Opts) -> io:format("client(~w): EXIT for ~p~n", [N, Reason]); {puback, _} -> %% Publish success for QoS 1 (recv puback) and 2 (recv pubcomp) - inc_counter(pub_succ), + inc_counter(Prometheus, pub_succ), loop(Parent, N, Client, PubSub, Opts); {disconnected, ReasonCode, _Meta} -> io:format("client(~w): disconnected with reason ~w: ~p~n", [N, ReasonCode, emqtt:reason_code_name(ReasonCode)]); {connected, _Props} -> - inc_counter(reconnect_succ), + inc_counter(Prometheus, reconnect_succ), IsSessionPresent = (1 == proplists:get_value(session_present, emqtt:info(Client))), %% @TODO here we do not really check the subscribe PubSub =:= sub andalso not IsSessionPresent andalso subscribe(Client, N, Opts), @@ -786,13 +831,13 @@ bump_publish_attempt_counter() -> _ = put(success_publish_count, NewCount), NewCount. -schedule_next_publish(Interval) -> +schedule_next_publish(Prometheus, Interval) -> PubAttempted = bump_publish_attempt_counter(), BeginTime = get_publish_begin_time(), NextTime = BeginTime + PubAttempted * Interval, NowT = erlang:monotonic_time(millisecond), Remain = NextTime - NowT, - Interval > 0 andalso Remain < 0 andalso inc_counter(pub_overrun), + Interval > 0 andalso Remain < 0 andalso inc_counter(Prometheus, pub_overrun), case Remain > 0 of true -> _ = erlang:send_after(Remain, self(), publish); false -> self() ! publish @@ -818,6 +863,7 @@ pub_limit_fun_init(N) when is_integer(N), N > 0 -> end. subscribe(Client, N, Opts) -> + Prometheus = proplists:get_value(prometheus, Opts, false), Qos = proplists:get_value(qos, Opts), Res = emqtt:subscribe(Client, [{Topic, Qos} || Topic <- topics_opt(Opts)]), case Res of @@ -833,6 +879,9 @@ subscribe(Client, N, Opts) -> ElapsedHandshake = HSTs - StartTs, ElapsedConn = ConnTs - StartTs, ElapsedSub = SubTs - StartTs, + histogram_observe(Prometheus, mqtt_client_handshake_duration, ElapsedHandshake), + histogram_observe(Prometheus, mqtt_client_connect_duration, ElapsedConn), + histogram_observe(Prometheus, mqtt_client_subscribe_duration, ElapsedSub), true = ets:insert(qoe_store, {proplists:get_value(client_id, Opts), {ElapsedHandshake, ElapsedConn, ElapsedSub} }), @@ -1096,6 +1145,7 @@ loop_opts(Opts) -> , seq , publish_signal_mref , pub_start_wait + , prometheus ]) end, Opts). @@ -1231,22 +1281,71 @@ counters() -> Idxs = lists:seq(2, length(Names) + 1), lists:zip(Names, Idxs). +maybe_init_prometheus(false) -> + ok; +maybe_init_prometheus(true) -> + Collectors = [ prometheus_boolean, + prometheus_counter, + prometheus_gauge, + prometheus_histogram, + prometheus_quantile_summary, + prometheus_summary], + application:set_env(prometheus, collectors, Collectors), + {ok, _} = application:ensure_all_started(prometheus), + Counters = [ publish_latency + , recv + , sub + , sub_fail + , pub + , pub_fail + , pub_overrun + , pub_succ + , connect_succ + , connect_fail + , connect_retried + , reconnect_succ + , unreachable + , connection_refused + , connection_timeout + , connection_idle + ], + lists:foreach( + fun(Cnt) -> + prometheus_counter:declare([{name, Cnt}, {help, atom_to_list(Cnt)}]) + end, Counters), + prometheus_histogram:declare([{name, mqtt_client_handshake_duration}, + {labels, []}, + {buckets, [1, 3, 5, 10, 20]}, + {help, "Handshake duration of MQTT client"}]), + prometheus_histogram:declare([{name, mqtt_client_connect_duration}, + {labels, []}, + {buckets, [1, 10, 25, 50, 100]}, + {help, "Connect duration of MQTT client"}]), + prometheus_histogram:declare([{name, mqtt_client_subscribe_duration}, + {labels, []}, + {buckets, [1, 10, 25, 50, 100]}, + {help, "Subscribe duration of MQTT client"}]), + prometheus_histogram:declare([{name, e2e_latency}, + {buckets, [1, 5, 10, 25, 50, 100, 500, 1000]}, + {help, "End-to-end latency"}]). + %% @doc Check received payload headers --spec maybe_check_payload_hdrs(Payload :: binary(), Hdrs :: [string()]) -> ok. -maybe_check_payload_hdrs({template, _Bin}, _) -> +-spec maybe_check_payload_hdrs(Prometheus :: boolean(), Payload :: binary(), Hdrs :: [string()]) -> ok. +maybe_check_payload_hdrs(_, {template, _Bin}, _) -> ok; -maybe_check_payload_hdrs(_Bin, []) -> +maybe_check_payload_hdrs(_, _Bin, []) -> ok; -maybe_check_payload_hdrs(<< TS:64/integer, BinL/binary >>, [?hdr_ts | RL]) -> +maybe_check_payload_hdrs(Prometheus, << TS:64/integer, BinL/binary >>, [?hdr_ts | RL]) -> E2ELatency = os:system_time(millisecond) - TS, - E2ELatency > 0 andalso inc_counter(publish_latency, E2ELatency), - maybe_check_payload_hdrs(BinL, RL); -maybe_check_payload_hdrs(<< Cnt:64/integer, BinL/binary >>, [?hdr_cnt64 | RL]) -> + E2ELatency > 0 andalso inc_counter(Prometheus, publish_latency, E2ELatency), + E2ELatency > 0 andalso histogram_observe(Prometheus, e2e_latency, E2ELatency), + maybe_check_payload_hdrs(Prometheus, BinL, RL); +maybe_check_payload_hdrs(Prometheus, << Cnt:64/integer, BinL/binary >>, [?hdr_cnt64 | RL]) -> case put(payload_hdr_cnt64, Cnt) of undefined -> ok; Old when Cnt - 1 == Old -> - maybe_check_payload_hdrs(BinL, RL); + maybe_check_payload_hdrs(Prometheus, BinL, RL); Old -> throw({err_payload_hdr_cnt64, Old, Cnt}) end. @@ -1288,3 +1387,30 @@ validate_payload_hdrs([Hdr | T]) -> false -> error({unsupp_payload_hdr, Hdr}) end. + +maybe_start_restapi(false, _Opts) -> + ok; +maybe_start_restapi(true, Opts) -> + IP = proplists:get_value(restapi_ip, Opts, "127.0.0.1"), + ParsedIP = case inet_parse:address(IP) of + {ok, IPAddr} -> IPAddr; + {error, Reason} -> error({bad_ip_address, {IP, Reason}}) + end, + Port = proplists:get_value(restapi_port, Opts, 9090), + TransportOpts = #{ip => ParsedIP, port => Port}, + Env = #{dispatch => dispatch()}, + ProtocolOpts = #{env => Env}, + application:ensure_all_started(cowboy), + cowboy:start_clear(rest_api, maps:to_list(TransportOpts), ProtocolOpts). + +dispatch() -> + cowboy_router:compile([{'_', routes()}]). + +routes() -> + [ {"/metrics", emqtt_bench_http_metrics, []} + ]. + +histogram_observe(false, _, _) -> + ok; +histogram_observe(true, Metric, Value) -> + prometheus_histogram:observe(Metric, Value).