Skip to content

Commit

Permalink
Merge pull request #182 from emqx/fix-inaccuracy-in-low-intv
Browse files Browse the repository at this point in the history
fix: fix the rate calculation error introduced by #179
  • Loading branch information
zmstone authored Jul 19, 2022
2 parents 585ac78 + 7c20bbd commit 8f92ed0
Showing 1 changed file with 39 additions and 18 deletions.
57 changes: 39 additions & 18 deletions src/emqtt_bench.erl
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,8 @@ maybe_retry(Parent, N, PubSub, Opts, ContinueFn) ->
end.

loop(Parent, N, Client, PubSub, Opts) ->
Idle = max(proplists:get_value(interval_of_msg, Opts, 0) * 2, 500),
Interval = proplists:get_value(interval_of_msg, Opts, 0),
Idle = max(Interval * 2, 500),
MRef = proplists:get_value(publish_signal_mref, Opts),
receive
{'DOWN', MRef, process, _Pid, start_publishing} ->
Expand All @@ -602,11 +603,14 @@ loop(Parent, N, Client, PubSub, Opts) ->
publish ->
case (proplists:get_value(limit_fun, Opts))() of
true ->
%% this call hangs if emqtt inflight is full
%% this call hangs if emqtt inflight is full
case publish(Client, Opts) of
ok -> next_publish(Opts);
{ok, _} -> next_publish(Opts);
ok ->
inc_counter(pub),
ok = schedule_next_publish(Interval),
ok;
{error, Reason} ->
%% TODO: schedule next publish for retry ?
inc_counter(pub_fail),
io:format("client(~w): publish error - ~p~n", [N, Reason])
end,
Expand Down Expand Up @@ -644,26 +648,37 @@ loop(Parent, N, Client, PubSub, Opts) ->
proc_lib:hibernate(?MODULE, loop, [Parent, N, Client, PubSub, Opts])
end.

put_publish_begin_time() ->
case get(last_publish_ts) of
ensure_publish_begin_time() ->
case get_publish_begin_time() of
undefined ->
NowT = erlang:monotonic_time(millisecond),
put(last_publish_ts, NowT);
put(publish_begin_ts, NowT),
ok;
_ ->
ok
end,
ok.
end.

get_publish_begin_time() ->
get(publish_begin_ts).

next_publish(Opts) ->
inc_counter(pub),
BeginTime = get(last_publish_ts),
PubCnt = get_counter(pub),
Interval = proplists:get_value(interval_of_msg, Opts),
NextTime = BeginTime + PubCnt * Interval,
%% @doc return new value
bump_publish_attempt_counter() ->
NewCount = case get(success_publish_count) of
undefined ->
1;
Val ->
Val + 1
end,
_ = put(success_publish_count, NewCount),
NewCount.

schedule_next_publish(Interval) ->
PubAttempted = bump_publish_attempt_counter(),
BeginTime = get_publish_begin_time(),
NextTime = BeginTime + PubAttempted * Interval,
NowT = erlang:monotonic_time(millisecond),
Remain = NextTime - NowT,
Interval > 0 andalso Remain < 0 andalso inc_counter(pub_overrun),

case Remain > 0 of
true -> _ = erlang:send_after(Remain, self(), publish);
false -> self() ! publish
Expand Down Expand Up @@ -696,11 +711,17 @@ subscribe(Client, N, Opts) ->
Res.

publish(Client, Opts) ->
ok = put_publish_begin_time(),
%% Ensure publish begin time is initialized right before the first publish,
%% because the first publish may get delayed (after entering the loop)
ok = ensure_publish_begin_time(),
Flags = [{qos, proplists:get_value(qos, Opts)},
{retain, proplists:get_value(retain, Opts)}],
Payload = proplists:get_value(payload, Opts),
emqtt:publish(Client, topic_opt(Opts), Payload, Flags).
case emqtt:publish(Client, topic_opt(Opts), Payload, Flags) of
ok -> ok;
{ok, _} -> ok;
{error, Reason} -> {error, Reason}
end.

session_property_opts(Opts) ->
case session_property_opts(Opts, #{}) of
Expand Down

0 comments on commit 8f92ed0

Please sign in to comment.