Skip to content

Commit

Permalink
Merge pull request emqx#13554 from id/20240731-sync-release-57
Browse files Browse the repository at this point in the history
sync release-57
  • Loading branch information
id authored Jul 31, 2024
2 parents 6058b50 + 85cff5e commit e42021d
Show file tree
Hide file tree
Showing 44 changed files with 910 additions and 324 deletions.
8 changes: 6 additions & 2 deletions apps/emqx/include/logger.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,20 @@
).

%% NOTE: do not forget to use atom for msg and add every used msg to
%% the default value of `log.thorttling.msgs` list.
%% the default value of `log.throttling.msgs` list.
-define(SLOG_THROTTLE(Level, Data),
?SLOG_THROTTLE(Level, Data, #{})
).

-define(SLOG_THROTTLE(Level, Data, Meta),
?SLOG_THROTTLE(Level, undefined, Data, Meta)
).

-define(SLOG_THROTTLE(Level, UniqueKey, Data, Meta),
case logger:allow(Level, ?MODULE) of
true ->
(fun(#{msg := __Msg} = __Data) ->
case emqx_log_throttler:allow(__Msg) of
case emqx_log_throttler:allow(__Msg, UniqueKey) of
true ->
logger:log(Level, __Data, Meta);
false ->
Expand Down
72 changes: 56 additions & 16 deletions apps/emqx/src/emqx_log_throttler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
-export([start_link/0]).

%% throttler API
-export([allow/1]).
-export([allow/2]).

%% gen_server callbacks
-export([
Expand All @@ -40,23 +40,29 @@
-define(SEQ_ID(Msg), {?MODULE, Msg}).
-define(NEW_SEQ, atomics:new(1, [{signed, false}])).
-define(GET_SEQ(Msg), persistent_term:get(?SEQ_ID(Msg), undefined)).
-define(ERASE_SEQ(Msg), persistent_term:erase(?SEQ_ID(Msg))).
-define(RESET_SEQ(SeqRef), atomics:put(SeqRef, 1, 0)).
-define(INC_SEQ(SeqRef), atomics:add(SeqRef, 1, 1)).
-define(GET_DROPPED(SeqRef), atomics:get(SeqRef, 1) - 1).
-define(IS_ALLOWED(SeqRef), atomics:add_get(SeqRef, 1, 1) =:= 1).

-define(NEW_THROTTLE(Msg, SeqRef), persistent_term:put(?SEQ_ID(Msg), SeqRef)).

-define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])).
-define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))).

-spec allow(atom()) -> boolean().
allow(Msg) when is_atom(Msg) ->
%% @doc Check if a throttled log message is allowed to pass down to the logger this time.
%% The Msg has to be an atom, and the second argument `UniqueKey' should be `undefined'
%% for predefined message IDs.
%% For relatively static resources created from configurations such as data integration
%% resource IDs `UniqueKey' should be of `binary()' type.
-spec allow(atom(), undefined | binary()) -> boolean().
allow(Msg, UniqueKey) when
is_atom(Msg) andalso (is_binary(UniqueKey) orelse UniqueKey =:= undefined)
->
case emqx_logger:get_primary_log_level() of
debug ->
true;
_ ->
do_allow(Msg)
do_allow(Msg, UniqueKey)
end.

-spec start_link() -> startlink_ret().
Expand All @@ -68,7 +74,8 @@ start_link() ->
%%--------------------------------------------------------------------

init([]) ->
ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST),
process_flag(trap_exit, true),
ok = lists:foreach(fun new_throttler/1, ?MSGS_LIST),
CurrentPeriodMs = ?TIME_WINDOW_MS,
TimerRef = schedule_refresh(CurrentPeriodMs),
{ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}.
Expand All @@ -86,16 +93,22 @@ handle_info(refresh, #{current_period_ms := PeriodMs} = State) ->
DroppedStats = lists:foldl(
fun(Msg, Acc) ->
case ?GET_SEQ(Msg) of
%% Should not happen, unless the static ids list is updated at run-time.
undefined ->
?NEW_THROTTLE(Msg, ?NEW_SEQ),
%% Should not happen, unless the static ids list is updated at run-time.
new_throttler(Msg),
?tp(log_throttler_new_msg, #{throttled_msg => Msg}),
Acc;
SeqMap when is_map(SeqMap) ->
maps:fold(
fun(Key, Ref, Acc0) ->
ID = iolist_to_binary([atom_to_binary(Msg), $:, Key]),
drop_stats(Ref, ID, Acc0)
end,
Acc,
SeqMap
);
SeqRef ->
Dropped = ?GET_DROPPED(SeqRef),
ok = ?RESET_SEQ(SeqRef),
?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
maybe_add_dropped(Msg, Dropped, Acc)
drop_stats(SeqRef, Msg, Acc)
end
end,
#{},
Expand All @@ -112,7 +125,16 @@ handle_info(Info, State) ->
?SLOG(error, #{msg => "unxpected_info", info => Info}),
{noreply, State}.

drop_stats(SeqRef, Msg, Acc) ->
Dropped = ?GET_DROPPED(SeqRef),
ok = ?RESET_SEQ(SeqRef),
?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
maybe_add_dropped(Msg, Dropped, Acc).

terminate(_Reason, _State) ->
%% atomics do not have delete/remove/release/deallocate API
%% after the reference is garbage-collected the resource is released
lists:foreach(fun(Msg) -> ?ERASE_SEQ(Msg) end, ?MSGS_LIST),
ok.

code_change(_OldVsn, State, _Extra) ->
Expand All @@ -122,17 +144,27 @@ code_change(_OldVsn, State, _Extra) ->
%% internal functions
%%--------------------------------------------------------------------

do_allow(Msg) ->
do_allow(Msg, UniqueKey) ->
case persistent_term:get(?SEQ_ID(Msg), undefined) of
undefined ->
%% This is either a race condition (emqx_log_throttler is not started yet)
%% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is
%% not added to the default value of `log.throttling.msgs`.
?SLOG(info, #{
msg => "missing_log_throttle_sequence",
?SLOG(debug, #{
msg => "log_throttle_disabled",
throttled_msg => Msg
}),
true;
%% e.g: unrecoverable msg throttle according resource_id
SeqMap when is_map(SeqMap) ->
case maps:find(UniqueKey, SeqMap) of
{ok, SeqRef} ->
?IS_ALLOWED(SeqRef);
error ->
SeqRef = ?NEW_SEQ,
new_throttler(Msg, SeqMap#{UniqueKey => SeqRef}),
true
end;
SeqRef ->
?IS_ALLOWED(SeqRef)
end.
Expand All @@ -154,3 +186,11 @@ maybe_log_dropped(_DroppedStats, _PeriodMs) ->
schedule_refresh(PeriodMs) ->
?tp(log_throttler_sched_refresh, #{new_period_ms => PeriodMs}),
erlang:send_after(PeriodMs, ?MODULE, refresh).

new_throttler(unrecoverable_resource_error = Msg) ->
new_throttler(Msg, #{});
new_throttler(Msg) ->
new_throttler(Msg, ?NEW_SEQ).

new_throttler(Msg, AtomicOrEmptyMap) ->
persistent_term:put(?SEQ_ID(Msg), AtomicOrEmptyMap).
78 changes: 65 additions & 13 deletions apps/emqx/test/emqx_log_throttler_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
%% Have to use real msgs, as the schema is guarded by enum.
-define(THROTTLE_MSG, authorization_permission_denied).
-define(THROTTLE_MSG1, cannot_publish_to_topic_due_to_not_authorized).
-define(THROTTLE_UNRECOVERABLE_MSG, unrecoverable_resource_error).
-define(TIME_WINDOW, <<"1s">>).

all() -> emqx_common_test_helpers:all(?MODULE).
Expand Down Expand Up @@ -59,6 +60,11 @@ end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)),
emqx_config:delete_override_conf_files().

init_per_testcase(t_throttle_recoverable_msg, Config) ->
ok = snabbkaffe:start_trace(),
[?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_UNRECOVERABLE_MSG | Conf], #{}),
Config;
init_per_testcase(t_throttle_add_new_msg, Config) ->
ok = snabbkaffe:start_trace(),
[?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
Expand All @@ -72,6 +78,10 @@ init_per_testcase(_TC, Config) ->
ok = snabbkaffe:start_trace(),
Config.

end_per_testcase(t_throttle_recoverable_msg, _Config) ->
ok = snabbkaffe:stop(),
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
ok;
end_per_testcase(t_throttle_add_new_msg, _Config) ->
ok = snabbkaffe:stop(),
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
Expand Down Expand Up @@ -101,8 +111,8 @@ t_throttle(_Config) ->
5000
),

?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
Expand All @@ -115,14 +125,48 @@ t_throttle(_Config) ->
[]
).

t_throttle_recoverable_msg(_Config) ->
ResourceId = <<"resource_id">>,
ThrottledMsg = iolist_to_binary([atom_to_list(?THROTTLE_UNRECOVERABLE_MSG), ":", ResourceId]),
?check_trace(
begin
%% Warm-up and block to increase the probability that next events
%% will be in the same throttling time window.
{ok, _} = ?block_until(
#{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_UNRECOVERABLE_MSG},
5000
),
{_, {ok, _}} = ?wait_async_action(
events(?THROTTLE_UNRECOVERABLE_MSG, ResourceId),
#{
?snk_kind := log_throttler_dropped,
throttled_msg := ThrottledMsg
},
5000
),

?assert(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
throttled_msg := ThrottledMsg,
dropped_count := 1
},
3000
)
end,
[]
).

t_throttle_add_new_msg(_Config) ->
?check_trace(
begin
{ok, _} = ?block_until(
#{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000
),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG1)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1)),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
Expand All @@ -137,10 +181,15 @@ t_throttle_add_new_msg(_Config) ->

t_throttle_no_msg(_Config) ->
%% Must simply pass with no crashes
?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
timer:sleep(10),
?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))).
Pid = erlang:whereis(emqx_log_throttler),
?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)),
?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)),
%% assert process is not restarted
?assertEqual(Pid, erlang:whereis(emqx_log_throttler)),
%% make a gen_call to ensure the process is alive
%% note: this call result in an 'unexpected_call' error log.
?assertEqual(ignored, gen_server:call(Pid, probe)),
ok.

t_update_time_window(_Config) ->
?check_trace(
Expand Down Expand Up @@ -168,8 +217,8 @@ t_throttle_debug_primary_level(_Config) ->
#{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG},
5000
),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
Expand All @@ -187,10 +236,13 @@ t_throttle_debug_primary_level(_Config) ->
%%--------------------------------------------------------------------

events(Msg) ->
events(100, Msg).
events(100, Msg, undefined).

events(Msg, Id) ->
events(100, Msg, Id).

events(N, Msg) ->
[emqx_log_throttler:allow(Msg) || _ <- lists:seq(1, N)].
events(N, Msg, Id) ->
[emqx_log_throttler:allow(Msg, Id) || _ <- lists:seq(1, N)].

module_exists(Mod) ->
case erlang:module_loaded(Mod) of
Expand Down
2 changes: 2 additions & 0 deletions apps/emqx_auth_http/include/emqx_auth_http.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@
-define(AUTHN_TYPE, {?AUTHN_MECHANISM, ?AUTHN_BACKEND}).
-define(AUTHN_TYPE_SCRAM, {?AUTHN_MECHANISM_SCRAM, ?AUTHN_BACKEND}).

-define(AUTHN_DATA_FIELDS, [is_superuser, client_attrs, expire_at, acl]).

-endif.
2 changes: 1 addition & 1 deletion apps/emqx_auth_http/src/emqx_auth_http_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
start(_StartType, _StartArgs) ->
ok = emqx_authz:register_source(?AUTHZ_TYPE, emqx_authz_http),
ok = emqx_authn:register_provider(?AUTHN_TYPE, emqx_authn_http),
ok = emqx_authn:register_provider(?AUTHN_TYPE_SCRAM, emqx_authn_scram_http),
ok = emqx_authn:register_provider(?AUTHN_TYPE_SCRAM, emqx_authn_scram_restapi),
{ok, Sup} = emqx_auth_http_sup:start_link(),
{ok, Sup}.

Expand Down
Loading

0 comments on commit e42021d

Please sign in to comment.