Skip to content

Commit

Permalink
Merge pull request #241 from id/0124-support-random-bytes-in-payload
Browse files Browse the repository at this point in the history
feat: support random bytes in payload
  • Loading branch information
id authored Jan 24, 2024
2 parents 152fb22 + a379506 commit f02d86a
Showing 1 changed file with 44 additions and 34 deletions.
78 changes: 44 additions & 34 deletions src/emqtt_bench.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,13 @@
{size, $s, "size", {integer, 256},
"payload size"},
{message, $m, "message", string,
"set the message content for publish"},
"Set the message content for publish. "
"Either a literal message content, or path to a file with payload template "
"specified via 'template://<file_path>'. "
"Available variables: %TIMESTAMP%, %TIMESTAMPMS%, %TIMESTAMPUS%, %TIMESTAMPNS%, %UNIQUE%, %RANDOM%. "
"When using 'template://', --size option does not have effect except for when %RANDOM% placeholder "
"is used."
},
{qos, $q, "qos", {integer, 0},
"subscribe qos"},
{qoe, $Q, "qoe", {boolean, false},
Expand Down Expand Up @@ -348,6 +354,7 @@ main(pub, Opts) ->
undefined
end,
start(pub, [ {payload, Payload}
, {payload_size, Size}
, {limit_fun, MsgLimit}
, {publish_signal_pid, PublishSignalPid}
| Opts]);
Expand Down Expand Up @@ -819,39 +826,41 @@ publish(Client, Opts) ->
ok = ensure_publish_begin_time(),
Flags = [{qos, proplists:get_value(qos, Opts)},
{retain, proplists:get_value(retain, Opts)}],
Payload0 = proplists:get_value(payload, Opts),
Payload = case Payload0 of
{template, Bin} ->
Now = os:system_time(nanosecond),
TsNS = integer_to_binary(Now),
TsUS = integer_to_binary(erlang:convert_time_unit(Now, nanosecond, microsecond)),
TsMS = integer_to_binary(erlang:convert_time_unit(Now, nanosecond, millisecond)),
Unique = integer_to_binary(erlang:unique_integer()),
Substitutions =
#{ <<"%TIMESTAMP%">> => TsMS
, <<"%TIMESTAMPMS%">> => TsMS
, <<"%TIMESTAMPUS%">> => TsUS
, <<"%TIMESTAMPNS%">> => TsNS
, <<"%UNIQUE%">> => Unique
},
maps:fold(
fun(Placeholder, Val, Acc) -> binary:replace(Acc, Placeholder, Val) end,
Bin,
Substitutions);
_ ->
Payload0
end,
%% prefix dynamic headers.
NewPayload = case proplists:get_value(payload_hdrs, Opts, []) of
[] -> Payload;
PayloadHdrs ->
with_payload_headers(PayloadHdrs, Payload)
end,
case emqtt:publish(Client, topic_opt(Opts), NewPayload, Flags) of
ok -> ok;
{ok, _} -> ok;
{error, Reason} -> {error, Reason}
end.
Size = proplists:get_value(payload_size, Opts),
Payload0 = proplists:get_value(payload, Opts),
Payload = case Payload0 of
{template, Bin} ->
Now = os:system_time(nanosecond),
TsNS = integer_to_binary(Now),
TsUS = integer_to_binary(erlang:convert_time_unit(Now, nanosecond, microsecond)),
TsMS = integer_to_binary(erlang:convert_time_unit(Now, nanosecond, millisecond)),
Unique = integer_to_binary(erlang:unique_integer()),
Substitutions =
#{ <<"%TIMESTAMP%">> => TsMS
, <<"%TIMESTAMPMS%">> => TsMS
, <<"%TIMESTAMPUS%">> => TsUS
, <<"%TIMESTAMPNS%">> => TsNS
, <<"%UNIQUE%">> => Unique
, <<"%RANDOM%">> => rand:bytes(Size)
},
maps:fold(
fun(Placeholder, Val, Acc) -> binary:replace(Acc, Placeholder, Val) end,
Bin,
Substitutions);
_ ->
Payload0
end,
%% prefix dynamic headers.
NewPayload = case proplists:get_value(payload_hdrs, Opts, []) of
[] -> Payload;
PayloadHdrs ->
with_payload_headers(PayloadHdrs, Payload)
end,
case emqtt:publish(Client, topic_opt(Opts), NewPayload, Flags) of
ok -> ok;
{ok, _} -> ok;
{error, Reason} -> {error, Reason}
end.

session_property_opts(Opts) ->
case session_property_opts(Opts, #{}) of
Expand Down Expand Up @@ -1052,6 +1061,7 @@ loop_opts(Opts) ->
lists:filter(fun({K,__V}) ->
lists:member(K, [ interval_of_msg
, payload
, payload_size
, payload_hdrs
, qos
, retain
Expand Down

0 comments on commit f02d86a

Please sign in to comment.