Skip to content

Commit

Permalink
feat(quic): quic fine tune
Browse files Browse the repository at this point in the history
  • Loading branch information
qzhuyan committed Oct 7, 2024
1 parent be5e02a commit deb45ad
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 12 deletions.
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

{deps, [
{getopt, {git, "https://github.com/zmstone/getopt", {tag, "v1.0.2.1"}}},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.13.1"}}},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.13.2"}}},
{prometheus, {git, "https://github.com/emqx/prometheus.erl", {tag, "v4.10.0.2"}}},
{cowboy, "2.9.0"},
{jsx, "3.1.0"}
Expand Down
68 changes: 57 additions & 11 deletions src/emqtt_bench.erl
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
"client private key for authentication, if required by server"},
{ws, undefined, "ws", {boolean, false},
"websocket transport"},
{quic, undefined, "quic", {boolean, false},
{quic, undefined, "quic", {string, "false"},
"QUIC transport"},
{nst_dets_file, undefined, "load-qst", string, "load quic session tickets from dets file"},
{ifaddr, undefined, "ifaddr", string,
Expand Down Expand Up @@ -206,7 +206,7 @@
"client private key for authentication, if required by server"},
{ws, undefined, "ws", {boolean, false},
"websocket transport"},
{quic, undefined, "quic", {boolean, false},
{quic, undefined, "quic", {string, "false"},
"QUIC transport"},
{nst_dets_file, undefined, "load-qst", string, "load quic session tickets from dets file"},
{ifaddr, undefined, "ifaddr", string,
Expand Down Expand Up @@ -272,7 +272,7 @@
"client certificate for authentication, if required by server"},
{keyfile, undefined, "keyfile", string,
"client private key for authentication, if required by server"},
{quic, undefined, "quic", {boolean, false},
{quic, undefined, "quic", {string, "false"},
"QUIC transport"},
{nst_dets_file, undefined, "load-qst", string, "load quic session tickets from dets file"},
{ifaddr, undefined, "ifaddr", string,
Expand Down Expand Up @@ -435,11 +435,13 @@ start(PubSub, Opts) ->
false ->
CntPerWorker
end,
QConnOpts = quic_opts_from_arg(Opts),
WOpts = replace_opts(Opts, [{startnumber, StartNumber},
{interval, Interval},
{payload_hdrs, PayloadHdrs},
{topics_payload, TopicPayload},
{count, Count1}
{count, Count1},
{quic, QConnOpts}
]),
WOpts1 = [{publish_signal_pid, PublishSignalPid} | WOpts],
proc_lib:spawn(?MODULE, run, [self(), PubSub, WOpts1, AddrList, HostList])
Expand All @@ -465,7 +467,7 @@ prepare(PubSub, Opts) ->
false ->
ok
end,
case proplists:get_bool(quic, Opts) of
case is_quic(Opts) of
true ->
maybe_start_quicer() orelse error({quic, not_supp_or_disabled}),
prepare_for_quic(Opts);
Expand Down Expand Up @@ -660,9 +662,10 @@ connect(Parent, N, PubSub, Opts) ->
ClientId = client_id(PubSub, N, Opts),
MqttOpts = [{clientid, ClientId},
{tcp_opts, tcp_opts(Opts)},
{ssl_opts, ssl_opts(Opts)}]
{ssl_opts, ssl_opts(Opts)},
{quic_opts, quic_opts(Opts, ClientId)}
]
++ session_property_opts(Opts)
++ quic_opts(Opts, ClientId)
++ mqtt_opts(Opts),
MqttOpts1 = case PubSub of
conn -> [{force_ping, true} | MqttOpts];
Expand Down Expand Up @@ -691,8 +694,8 @@ connect(Parent, N, PubSub, Opts) ->
undefined when TopicPayloadRend == undefined ->
erlang:send_after(RandomPubWaitMS, self(), publish);
undefined ->
maps:foreach(fun(TopicName, #{name := TopicName}) ->
erlang:send_after(RandomPubWaitMS, self(), {publish, TopicName})
maps:foreach(fun(TopicName, #{name := TopicName, interval_ms := DelayMs}) ->
erlang:send_after(DelayMs, self(), {publish, TopicName})
end, TopicPayloadRend);
_ ->
%% send `publish' only when all publishers
Expand Down Expand Up @@ -1097,7 +1100,7 @@ all_ssl_ciphers() ->

-spec connect_fun(proplists:proplist()) -> FunName :: atom().
connect_fun(Opts)->
case {proplists:get_bool(ws, Opts), proplists:get_bool(quic, Opts)} of
case {proplists:get_bool(ws, Opts), is_quic(Opts)} of
{true, true} ->
throw({error, "unsupported transport: ws over quic "});
{true, false} ->
Expand Down Expand Up @@ -1294,7 +1297,24 @@ shard_addr(N, AddrList) ->
Offset = N rem length(AddrList),
lists:nth(Offset + 1, AddrList).

quic_opts(Opts, ClientId) when is_binary(ClientId) ->
-spec quic_opts(proplists:proplist(), binary()) -> {proplists:proplist(), proplists:proplist()}.
quic_opts(Opts, ClientId) ->
Nst = quic_opts_nst(Opts, ClientId),
case proplists:get_value(quic, Opts, undefined) of
undefined ->
[];
false ->
[];
true ->
Nst;
[] ->
Nst;
[{ConnOpts, StrmOpts}]
when is_list(ConnOpts) andalso is_list(StrmOpts)->
{Nst++ConnOpts, StrmOpts}
end.

quic_opts_nst(Opts, ClientId) when is_binary(ClientId) ->
case proplists:get_value(nst_dets_file, Opts, undefined) of
undefined -> [];
_Filename ->
Expand Down Expand Up @@ -1595,3 +1615,29 @@ maybe_prefix_payload(Payload, ClientOpts) ->
PayloadHdrs ->
with_payload_headers(PayloadHdrs, Payload)
end.

-spec is_quic(proplists:proplist()) -> boolean().
is_quic(Opts) ->
proplists:get_value(quic, Opts, false) =/= false.

quic_opts_from_arg(Opts)->
case proplists:get_value(quic, Opts, false) of
V when is_boolean(V) ->
V;
"false" ->
false;
"true" ->
true;
V when is_list(V) ->
case filename:extension(V) of
".eterm" ->
case file:consult(V) of
{ok, ConnOpts} ->
ConnOpts;
{error, enoent} ->
error({"--quic "++ V, no_exists})
end;
_ ->
error("bad --quic")
end
end.

0 comments on commit deb45ad

Please sign in to comment.