diff --git a/rebar.config b/rebar.config index 566639c..c2b9f0b 100644 --- a/rebar.config +++ b/rebar.config @@ -16,7 +16,7 @@ {deps, [ {getopt, {git, "https://github.com/zmstone/getopt", {tag, "v1.0.2.1"}}}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.13.1"}}}, + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.13.2"}}}, {prometheus, {git, "https://github.com/emqx/prometheus.erl", {tag, "v4.10.0.2"}}}, {cowboy, "2.9.0"}, {jsx, "3.1.0"} diff --git a/rebar.config.script b/rebar.config.script index e8e2f86..3cd2a1c 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -11,7 +11,7 @@ IsWin32 = fun() -> win32 =:= element(1, os:type()) end, -Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.504"}}}, +Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.1.5"}}}, IsQuicSupp = not (IsCentos6() orelse IsWin32() orelse false =/= os:getenv("BUILD_WITHOUT_QUIC") diff --git a/src/emqtt_bench.erl b/src/emqtt_bench.erl index 03245c6..5575d53 100644 --- a/src/emqtt_bench.erl +++ b/src/emqtt_bench.erl @@ -112,7 +112,7 @@ "client private key for authentication, if required by server"}, {ws, undefined, "ws", {boolean, false}, "websocket transport"}, - {quic, undefined, "quic", {boolean, false}, + {quic, undefined, "quic", {string, "false"}, "QUIC transport"}, {nst_dets_file, undefined, "load-qst", string, "load quic session tickets from dets file"}, {ifaddr, undefined, "ifaddr", string, @@ -206,7 +206,7 @@ "client private key for authentication, if required by server"}, {ws, undefined, "ws", {boolean, false}, "websocket transport"}, - {quic, undefined, "quic", {boolean, false}, + {quic, undefined, "quic", {string, "false"}, "QUIC transport"}, {nst_dets_file, undefined, "load-qst", string, "load quic session tickets from dets file"}, {ifaddr, undefined, "ifaddr", string, @@ -272,7 +272,7 @@ "client certificate for authentication, if required by server"}, {keyfile, undefined, "keyfile", string, "client private key for authentication, if required by server"}, - {quic, undefined, "quic", {boolean, false}, + {quic, undefined, "quic", {string, "false"}, "QUIC transport"}, {nst_dets_file, undefined, "load-qst", string, "load quic session tickets from dets file"}, {ifaddr, undefined, "ifaddr", string, @@ -435,11 +435,13 @@ start(PubSub, Opts) -> false -> CntPerWorker end, + QConnOpts = quic_opts_from_arg(Opts), WOpts = replace_opts(Opts, [{startnumber, StartNumber}, {interval, Interval}, {payload_hdrs, PayloadHdrs}, {topics_payload, TopicPayload}, - {count, Count1} + {count, Count1}, + {quic, QConnOpts} ]), WOpts1 = [{publish_signal_pid, PublishSignalPid} | WOpts], proc_lib:spawn(?MODULE, run, [self(), PubSub, WOpts1, AddrList, HostList]) @@ -465,7 +467,7 @@ prepare(PubSub, Opts) -> false -> ok end, - case proplists:get_bool(quic, Opts) of + case is_quic(Opts) of true -> maybe_start_quicer() orelse error({quic, not_supp_or_disabled}), prepare_for_quic(Opts); @@ -660,9 +662,10 @@ connect(Parent, N, PubSub, Opts) -> ClientId = client_id(PubSub, N, Opts), MqttOpts = [{clientid, ClientId}, {tcp_opts, tcp_opts(Opts)}, - {ssl_opts, ssl_opts(Opts)}] + {ssl_opts, ssl_opts(Opts)}, + {quic_opts, quic_opts(Opts, ClientId)} + ] ++ session_property_opts(Opts) - ++ quic_opts(Opts, ClientId) ++ mqtt_opts(Opts), MqttOpts1 = case PubSub of conn -> [{force_ping, true} | MqttOpts]; @@ -691,8 +694,8 @@ connect(Parent, N, PubSub, Opts) -> undefined when TopicPayloadRend == undefined -> erlang:send_after(RandomPubWaitMS, self(), publish); undefined -> - maps:foreach(fun(TopicName, #{name := TopicName}) -> - erlang:send_after(RandomPubWaitMS, self(), {publish, TopicName}) + maps:foreach(fun(TopicName, #{name := TopicName, interval_ms := DelayMs}) -> + erlang:send_after(RandomPubWaitMS + DelayMs, self(), {publish, TopicName}) end, TopicPayloadRend); _ -> %% send `publish' only when all publishers @@ -1097,7 +1100,7 @@ all_ssl_ciphers() -> -spec connect_fun(proplists:proplist()) -> FunName :: atom(). connect_fun(Opts)-> - case {proplists:get_bool(ws, Opts), proplists:get_bool(quic, Opts)} of + case {proplists:get_bool(ws, Opts), is_quic(Opts)} of {true, true} -> throw({error, "unsupported transport: ws over quic "}); {true, false} -> @@ -1294,7 +1297,24 @@ shard_addr(N, AddrList) -> Offset = N rem length(AddrList), lists:nth(Offset + 1, AddrList). -quic_opts(Opts, ClientId) when is_binary(ClientId) -> +-spec quic_opts(proplists:proplist(), binary()) -> {proplists:proplist(), proplists:proplist()}. +quic_opts(Opts, ClientId) -> + Nst = quic_opts_nst(Opts, ClientId), + case proplists:get_value(quic, Opts, undefined) of + undefined -> + []; + false -> + []; + true -> + Nst; + [] -> + Nst; + [{ConnOpts, StrmOpts}] + when is_list(ConnOpts) andalso is_list(StrmOpts)-> + {Nst++ConnOpts, StrmOpts} + end. + +quic_opts_nst(Opts, ClientId) when is_binary(ClientId) -> case proplists:get_value(nst_dets_file, Opts, undefined) of undefined -> []; _Filename -> @@ -1504,26 +1524,8 @@ parse_topics_payload(Opts) -> case proplists:get_value(topics_payload, Opts) of undefined -> undefined; Filename -> - lists:foreach( - fun({Page, Payload}) -> - ets:insert(?shared_padding_tab, {Page, Payload}) - end, [{10, << <> || N <- lists:seq(0, 10*512) >>}, - {250, << <> || N <- lists:seq(0, 250*512)>>}, - {2500, << <> || N <- lists:seq(0, 2500*512)>>}, - {25000, << <> || N <- lists:seq(0, 25000*512)>>} - ]), {ok, Content} = file:read_file(Filename), #{<<"topics">> := TopicSpecs} = jsx:decode(Content), - %% Example - %% #{<<"topics">> => - %% [#{<<"inject_timestamp">> => <<"ms">>,<<"interval_ms">> => <<"1000">>, - %% <<"name">> => <<"Topic1">>, - %% <<"payload">> => - %% #{<<"foo">> => <<"bar">>,<<"timestamp">> => <<"0">>}}, - %% #{<<"inject_timestamp">> => true,<<"interval_ms">> => <<"500">>, - %% <<"name">> => <<"Topic2">>, - %% <<"payload">> => - %% #{<<"foo">> => <<"bar">>,<<"timestamp">> => <<"0">>}}]} lists:foldl(fun(#{ <<"name">> := TopicName, <<"inject_timestamp">> := WithTS, <<"interval_ms">> := IntervalMS, @@ -1567,19 +1569,28 @@ do_render_payload(#{payload := Payload, render_field := FieldName} = Spec, Opts) Template = maps:get(FieldName, Payload, ""), SRs = [ {<<"%i">>, integer_to_binary(proplists:get_value(seq, Opts))} , {<<"%c">>, proplists:get_value(client_id, Opts)} - , {<<"%p10">>, shared_paddings(10)} - , {<<"%p2500">>, shared_paddings(2500)} - , {<<"%p25000">>, shared_paddings(25000)} + , {<<"%p">>, fun shared_paddings/1} ], - NewVal = lists:foldl(fun({Search, Replace}, Acc) -> + NewVal = lists:foldl(fun({<<"%p">>, Fun}, Acc) when is_function(Fun) -> + <<"%p", Pages/binary >> = Acc, + Fun(Pages); + ({Search, Replace}, Acc) -> binary:replace(Acc, Search, Replace, [global]) end, Template, SRs), Spec#{payload := Payload#{FieldName := NewVal}}. %% @doc shared_paddings, utilize binary ref for shallow copy -shared_paddings(Pages) -> +shared_paddings(Pages) when is_binary(Pages) -> + shared_paddings(binary_to_integer(Pages)); +shared_paddings(Pages) when is_integer(Pages) -> %% one page = 4KBytes - [{_, Payload}] = ets:lookup(?shared_padding_tab, Pages), + case ets:lookup(?shared_padding_tab, Pages) of + [{_, Payload}] -> + Payload; + [] -> + Payload = << <> || N <- lists:seq(1, Pages*512) >>, + ets:insert(?shared_padding_tab, {Pages, Payload}) + end, Payload. %% @doc send via which QUIC stream. @@ -1595,3 +1606,29 @@ maybe_prefix_payload(Payload, ClientOpts) -> PayloadHdrs -> with_payload_headers(PayloadHdrs, Payload) end. + +-spec is_quic(proplists:proplist()) -> boolean(). +is_quic(Opts) -> + proplists:get_value(quic, Opts, false) =/= false. + +quic_opts_from_arg(Opts)-> + case proplists:get_value(quic, Opts, false) of + V when is_boolean(V) -> + V; + "false" -> + false; + "true" -> + true; + V when is_list(V) -> + case filename:extension(V) of + ".eterm" -> + case file:consult(V) of + {ok, ConnOpts} -> + ConnOpts; + {error, enoent} -> + error({"--quic "++ V, no_exists}) + end; + _ -> + error("bad --quic") + end + end.