diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a410a1..3891fe3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # emqtt-bench changelog +## 0.4.13 + +* Add `--retry-interval` option to `pub` command and use `0` as default value (0 means disable resend). + ## 0.4.5 * Default value for `--inflight` option is changed from `0` (no back-pressure) to `1`. diff --git a/src/emqtt_bench.erl b/src/emqtt_bench.erl index 4853081..abe9c44 100644 --- a/src/emqtt_bench.erl +++ b/src/emqtt_bench.erl @@ -131,6 +131,11 @@ "Control where the log output goes. " "console: directly to the console " "null: quietly, don't output any logs." + }, + {retry_interval, undefined, "retry-interval", {integer, 0}, + "Publisher's resend interval (in seconds) if the expected " + "acknowledgement for a inflight packet is not " + "received within this interval. Default value 0 means no resend." } ]). @@ -328,7 +333,7 @@ main(pub, Opts) -> StrPayload -> unicode:characters_to_binary(StrPayload) end, - MsgLimit = consumer_pub_msg_fun_init(proplists:get_value(limit, Opts)), + MsgLimit = pub_limit_fun_init(proplists:get_value(limit, Opts)), PublishSignalPid = case proplists:get_value(wait_before_publishing, Opts) of true -> @@ -752,16 +757,21 @@ schedule_next_publish(Interval) -> end, ok. -consumer_pub_msg_fun_init(0) -> +pub_limit_fun_init(0) -> fun() -> true end; -consumer_pub_msg_fun_init(N) when is_integer(N), N > 0 -> +pub_limit_fun_init(N) when is_integer(N), N > 0 -> Ref = counters:new(1, []), counters:put(Ref, 1, N), fun() -> case counters:get(Ref, 1) of - 0 -> false; + 0 -> + false; + X when X < 0 -> + %% this means PUBLISH overrun the limit option, due to race + false; _ -> - counters:sub(Ref, 1, 1), true + counters:sub(Ref, 1, 1), + true end end. @@ -884,6 +894,8 @@ mqtt_opts([{inflight, InFlight0}|Opts], Acc) -> _ -> InFlight0 end, mqtt_opts(Opts, [{max_inflight, InFlight} | Acc]); +mqtt_opts([{retry_interval, IntervalSeconds}|Opts], Acc) -> + mqtt_opts(Opts, [{retry_interval, IntervalSeconds}|Acc]); mqtt_opts([_|Opts], Acc) -> mqtt_opts(Opts, Acc).