From 464226f9dfc79e6a8ef12972573a4ac761d42488 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 3 Aug 2023 16:07:25 +0200 Subject: [PATCH 1/3] feat: add retry_interval for pub command --- src/emqtt_bench.erl | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/emqtt_bench.erl b/src/emqtt_bench.erl index 4853081..2e20224 100644 --- a/src/emqtt_bench.erl +++ b/src/emqtt_bench.erl @@ -131,6 +131,11 @@ "Control where the log output goes. " "console: directly to the console " "null: quietly, don't output any logs." + }, + {retry_interval, undefined, "retry-interval", {integer, 0}, + "Publisher's resend interval (in seconds) if the expected " + "acknowledgement for a inflight packet is not " + "received within this interval. Default value 0 means no resend." } ]). @@ -884,6 +889,8 @@ mqtt_opts([{inflight, InFlight0}|Opts], Acc) -> _ -> InFlight0 end, mqtt_opts(Opts, [{max_inflight, InFlight} | Acc]); +mqtt_opts([{retry_interval, IntervalSeconds}|Opts], Acc) -> + mqtt_opts(Opts, [{retry_interval, IntervalSeconds}|Acc]); mqtt_opts([_|Opts], Acc) -> mqtt_opts(Opts, Acc). From 941444aa9e4965c89d66505f876993a1b639c0a7 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 3 Aug 2023 16:11:27 +0200 Subject: [PATCH 2/3] docs: add changelog for 0.4.13 --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a410a1..3891fe3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # emqtt-bench changelog +## 0.4.13 + +* Add `--retry-interval` option to `pub` command and use `0` as default value (0 means disable resend). + ## 0.4.5 * Default value for `--inflight` option is changed from `0` (no back-pressure) to `1`. From 588b6ad8b2baf332cee6e14d4fd94081c028d789 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 3 Aug 2023 16:32:56 +0200 Subject: [PATCH 3/3] fix: elminate a potential race condition which cause publish dead-loop --- src/emqtt_bench.erl | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/emqtt_bench.erl b/src/emqtt_bench.erl index 2e20224..abe9c44 100644 --- a/src/emqtt_bench.erl +++ b/src/emqtt_bench.erl @@ -333,7 +333,7 @@ main(pub, Opts) -> StrPayload -> unicode:characters_to_binary(StrPayload) end, - MsgLimit = consumer_pub_msg_fun_init(proplists:get_value(limit, Opts)), + MsgLimit = pub_limit_fun_init(proplists:get_value(limit, Opts)), PublishSignalPid = case proplists:get_value(wait_before_publishing, Opts) of true -> @@ -757,16 +757,21 @@ schedule_next_publish(Interval) -> end, ok. -consumer_pub_msg_fun_init(0) -> +pub_limit_fun_init(0) -> fun() -> true end; -consumer_pub_msg_fun_init(N) when is_integer(N), N > 0 -> +pub_limit_fun_init(N) when is_integer(N), N > 0 -> Ref = counters:new(1, []), counters:put(Ref, 1, N), fun() -> case counters:get(Ref, 1) of - 0 -> false; + 0 -> + false; + X when X < 0 -> + %% this means PUBLISH overrun the limit option, due to race + false; _ -> - counters:sub(Ref, 1, 1), true + counters:sub(Ref, 1, 1), + true end end.