diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index 0083a966049..a65b05089a8 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -41,16 +41,20 @@ ). %% NOTE: do not forget to use atom for msg and add every used msg to -%% the default value of `log.thorttling.msgs` list. +%% the default value of `log.throttling.msgs` list. -define(SLOG_THROTTLE(Level, Data), ?SLOG_THROTTLE(Level, Data, #{}) ). -define(SLOG_THROTTLE(Level, Data, Meta), + ?SLOG_THROTTLE(Level, undefined, Data, Meta) +). + +-define(SLOG_THROTTLE(Level, UniqueKey, Data, Meta), case logger:allow(Level, ?MODULE) of true -> (fun(#{msg := __Msg} = __Data) -> - case emqx_log_throttler:allow(__Msg) of + case emqx_log_throttler:allow(__Msg, UniqueKey) of true -> logger:log(Level, __Data, Meta); false -> diff --git a/apps/emqx/src/emqx_log_throttler.erl b/apps/emqx/src/emqx_log_throttler.erl index 3ebc268fa63..928580e2bd9 100644 --- a/apps/emqx/src/emqx_log_throttler.erl +++ b/apps/emqx/src/emqx_log_throttler.erl @@ -25,7 +25,7 @@ -export([start_link/0]). %% throttler API --export([allow/1]). +-export([allow/2]). %% gen_server callbacks -export([ @@ -40,23 +40,29 @@ -define(SEQ_ID(Msg), {?MODULE, Msg}). -define(NEW_SEQ, atomics:new(1, [{signed, false}])). -define(GET_SEQ(Msg), persistent_term:get(?SEQ_ID(Msg), undefined)). +-define(ERASE_SEQ(Msg), persistent_term:erase(?SEQ_ID(Msg))). -define(RESET_SEQ(SeqRef), atomics:put(SeqRef, 1, 0)). -define(INC_SEQ(SeqRef), atomics:add(SeqRef, 1, 1)). -define(GET_DROPPED(SeqRef), atomics:get(SeqRef, 1) - 1). -define(IS_ALLOWED(SeqRef), atomics:add_get(SeqRef, 1, 1) =:= 1). --define(NEW_THROTTLE(Msg, SeqRef), persistent_term:put(?SEQ_ID(Msg), SeqRef)). - -define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])). -define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))). --spec allow(atom()) -> boolean(). -allow(Msg) when is_atom(Msg) -> +%% @doc Check if a throttled log message is allowed to pass down to the logger this time. +%% The Msg has to be an atom, and the second argument `UniqueKey' should be `undefined' +%% for predefined message IDs. +%% For relatively static resources created from configurations such as data integration +%% resource IDs `UniqueKey' should be of `binary()' type. +-spec allow(atom(), undefined | binary()) -> boolean(). +allow(Msg, UniqueKey) when + is_atom(Msg) andalso (is_binary(UniqueKey) orelse UniqueKey =:= undefined) +-> case emqx_logger:get_primary_log_level() of debug -> true; _ -> - do_allow(Msg) + do_allow(Msg, UniqueKey) end. -spec start_link() -> startlink_ret(). @@ -68,7 +74,8 @@ start_link() -> %%-------------------------------------------------------------------- init([]) -> - ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST), + process_flag(trap_exit, true), + ok = lists:foreach(fun new_throttler/1, ?MSGS_LIST), CurrentPeriodMs = ?TIME_WINDOW_MS, TimerRef = schedule_refresh(CurrentPeriodMs), {ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}. @@ -86,16 +93,22 @@ handle_info(refresh, #{current_period_ms := PeriodMs} = State) -> DroppedStats = lists:foldl( fun(Msg, Acc) -> case ?GET_SEQ(Msg) of - %% Should not happen, unless the static ids list is updated at run-time. undefined -> - ?NEW_THROTTLE(Msg, ?NEW_SEQ), + %% Should not happen, unless the static ids list is updated at run-time. + new_throttler(Msg), ?tp(log_throttler_new_msg, #{throttled_msg => Msg}), Acc; + SeqMap when is_map(SeqMap) -> + maps:fold( + fun(Key, Ref, Acc0) -> + ID = iolist_to_binary([atom_to_binary(Msg), $:, Key]), + drop_stats(Ref, ID, Acc0) + end, + Acc, + SeqMap + ); SeqRef -> - Dropped = ?GET_DROPPED(SeqRef), - ok = ?RESET_SEQ(SeqRef), - ?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}), - maybe_add_dropped(Msg, Dropped, Acc) + drop_stats(SeqRef, Msg, Acc) end end, #{}, @@ -112,7 +125,16 @@ handle_info(Info, State) -> ?SLOG(error, #{msg => "unxpected_info", info => Info}), {noreply, State}. +drop_stats(SeqRef, Msg, Acc) -> + Dropped = ?GET_DROPPED(SeqRef), + ok = ?RESET_SEQ(SeqRef), + ?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}), + maybe_add_dropped(Msg, Dropped, Acc). + terminate(_Reason, _State) -> + %% atomics do not have delete/remove/release/deallocate API + %% after the reference is garbage-collected the resource is released + lists:foreach(fun(Msg) -> ?ERASE_SEQ(Msg) end, ?MSGS_LIST), ok. code_change(_OldVsn, State, _Extra) -> @@ -122,17 +144,27 @@ code_change(_OldVsn, State, _Extra) -> %% internal functions %%-------------------------------------------------------------------- -do_allow(Msg) -> +do_allow(Msg, UniqueKey) -> case persistent_term:get(?SEQ_ID(Msg), undefined) of undefined -> %% This is either a race condition (emqx_log_throttler is not started yet) %% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is %% not added to the default value of `log.throttling.msgs`. - ?SLOG(info, #{ - msg => "missing_log_throttle_sequence", + ?SLOG(debug, #{ + msg => "log_throttle_disabled", throttled_msg => Msg }), true; + %% e.g: unrecoverable msg throttle according resource_id + SeqMap when is_map(SeqMap) -> + case maps:find(UniqueKey, SeqMap) of + {ok, SeqRef} -> + ?IS_ALLOWED(SeqRef); + error -> + SeqRef = ?NEW_SEQ, + new_throttler(Msg, SeqMap#{UniqueKey => SeqRef}), + true + end; SeqRef -> ?IS_ALLOWED(SeqRef) end. @@ -154,3 +186,11 @@ maybe_log_dropped(_DroppedStats, _PeriodMs) -> schedule_refresh(PeriodMs) -> ?tp(log_throttler_sched_refresh, #{new_period_ms => PeriodMs}), erlang:send_after(PeriodMs, ?MODULE, refresh). + +new_throttler(unrecoverable_resource_error = Msg) -> + new_throttler(Msg, #{}); +new_throttler(Msg) -> + new_throttler(Msg, ?NEW_SEQ). + +new_throttler(Msg, AtomicOrEmptyMap) -> + persistent_term:put(?SEQ_ID(Msg), AtomicOrEmptyMap). diff --git a/apps/emqx/test/emqx_log_throttler_SUITE.erl b/apps/emqx/test/emqx_log_throttler_SUITE.erl index 8b3ac020755..f95d6296932 100644 --- a/apps/emqx/test/emqx_log_throttler_SUITE.erl +++ b/apps/emqx/test/emqx_log_throttler_SUITE.erl @@ -26,6 +26,7 @@ %% Have to use real msgs, as the schema is guarded by enum. -define(THROTTLE_MSG, authorization_permission_denied). -define(THROTTLE_MSG1, cannot_publish_to_topic_due_to_not_authorized). +-define(THROTTLE_UNRECOVERABLE_MSG, unrecoverable_resource_error). -define(TIME_WINDOW, <<"1s">>). all() -> emqx_common_test_helpers:all(?MODULE). @@ -59,6 +60,11 @@ end_per_suite(Config) -> emqx_cth_suite:stop(?config(suite_apps, Config)), emqx_config:delete_override_conf_files(). +init_per_testcase(t_throttle_recoverable_msg, Config) -> + ok = snabbkaffe:start_trace(), + [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]), + {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_UNRECOVERABLE_MSG | Conf], #{}), + Config; init_per_testcase(t_throttle_add_new_msg, Config) -> ok = snabbkaffe:start_trace(), [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]), @@ -72,6 +78,10 @@ init_per_testcase(_TC, Config) -> ok = snabbkaffe:start_trace(), Config. +end_per_testcase(t_throttle_recoverable_msg, _Config) -> + ok = snabbkaffe:stop(), + {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}), + ok; end_per_testcase(t_throttle_add_new_msg, _Config) -> ok = snabbkaffe:stop(), {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}), @@ -101,8 +111,8 @@ t_throttle(_Config) -> 5000 ), - ?assert(emqx_log_throttler:allow(?THROTTLE_MSG)), - ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)), + ?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)), {ok, _} = ?block_until( #{ ?snk_kind := log_throttler_dropped, @@ -115,14 +125,48 @@ t_throttle(_Config) -> [] ). +t_throttle_recoverable_msg(_Config) -> + ResourceId = <<"resource_id">>, + ThrottledMsg = iolist_to_binary([atom_to_list(?THROTTLE_UNRECOVERABLE_MSG), ":", ResourceId]), + ?check_trace( + begin + %% Warm-up and block to increase the probability that next events + %% will be in the same throttling time window. + {ok, _} = ?block_until( + #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_UNRECOVERABLE_MSG}, + 5000 + ), + {_, {ok, _}} = ?wait_async_action( + events(?THROTTLE_UNRECOVERABLE_MSG, ResourceId), + #{ + ?snk_kind := log_throttler_dropped, + throttled_msg := ThrottledMsg + }, + 5000 + ), + + ?assert(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)), + {ok, _} = ?block_until( + #{ + ?snk_kind := log_throttler_dropped, + throttled_msg := ThrottledMsg, + dropped_count := 1 + }, + 3000 + ) + end, + [] + ). + t_throttle_add_new_msg(_Config) -> ?check_trace( begin {ok, _} = ?block_until( #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000 ), - ?assert(emqx_log_throttler:allow(?THROTTLE_MSG1)), - ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1)), + ?assert(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)), {ok, _} = ?block_until( #{ ?snk_kind := log_throttler_dropped, @@ -137,10 +181,15 @@ t_throttle_add_new_msg(_Config) -> t_throttle_no_msg(_Config) -> %% Must simply pass with no crashes - ?assert(emqx_log_throttler:allow(no_test_throttle_msg)), - ?assert(emqx_log_throttler:allow(no_test_throttle_msg)), - timer:sleep(10), - ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))). + Pid = erlang:whereis(emqx_log_throttler), + ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)), + ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)), + %% assert process is not restarted + ?assertEqual(Pid, erlang:whereis(emqx_log_throttler)), + %% make a gen_call to ensure the process is alive + %% note: this call result in an 'unexpected_call' error log. + ?assertEqual(ignored, gen_server:call(Pid, probe)), + ok. t_update_time_window(_Config) -> ?check_trace( @@ -168,8 +217,8 @@ t_throttle_debug_primary_level(_Config) -> #{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG}, 5000 ), - ?assert(emqx_log_throttler:allow(?THROTTLE_MSG)), - ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)), + ?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)), {ok, _} = ?block_until( #{ ?snk_kind := log_throttler_dropped, @@ -187,10 +236,13 @@ t_throttle_debug_primary_level(_Config) -> %%-------------------------------------------------------------------- events(Msg) -> - events(100, Msg). + events(100, Msg, undefined). + +events(Msg, Id) -> + events(100, Msg, Id). -events(N, Msg) -> - [emqx_log_throttler:allow(Msg) || _ <- lists:seq(1, N)]. +events(N, Msg, Id) -> + [emqx_log_throttler:allow(Msg, Id) || _ <- lists:seq(1, N)]. module_exists(Mod) -> case erlang:module_loaded(Mod) of diff --git a/apps/emqx_auth_http/include/emqx_auth_http.hrl b/apps/emqx_auth_http/include/emqx_auth_http.hrl index c0bfa217788..439087e9c48 100644 --- a/apps/emqx_auth_http/include/emqx_auth_http.hrl +++ b/apps/emqx_auth_http/include/emqx_auth_http.hrl @@ -31,4 +31,6 @@ -define(AUTHN_TYPE, {?AUTHN_MECHANISM, ?AUTHN_BACKEND}). -define(AUTHN_TYPE_SCRAM, {?AUTHN_MECHANISM_SCRAM, ?AUTHN_BACKEND}). +-define(AUTHN_DATA_FIELDS, [is_superuser, client_attrs, expire_at, acl]). + -endif. diff --git a/apps/emqx_auth_http/src/emqx_auth_http_app.erl b/apps/emqx_auth_http/src/emqx_auth_http_app.erl index 3d8ae0dadd8..8b7d08c4ef2 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http_app.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http_app.erl @@ -25,7 +25,7 @@ start(_StartType, _StartArgs) -> ok = emqx_authz:register_source(?AUTHZ_TYPE, emqx_authz_http), ok = emqx_authn:register_provider(?AUTHN_TYPE, emqx_authn_http), - ok = emqx_authn:register_provider(?AUTHN_TYPE_SCRAM, emqx_authn_scram_http), + ok = emqx_authn:register_provider(?AUTHN_TYPE_SCRAM, emqx_authn_scram_restapi), {ok, Sup} = emqx_auth_http_sup:start_link(), {ok, Sup}. diff --git a/apps/emqx_auth_http/src/emqx_authn_http.erl b/apps/emqx_auth_http/src/emqx_authn_http.erl index 4e7770334b6..67d7403ea4d 100644 --- a/apps/emqx_auth_http/src/emqx_authn_http.erl +++ b/apps/emqx_auth_http/src/emqx_authn_http.erl @@ -32,7 +32,9 @@ with_validated_config/2, generate_request/2, request_for_log/2, - response_for_log/1 + response_for_log/1, + extract_auth_data/2, + safely_parse_body/2 ]). -define(DEFAULT_CONTENT_TYPE, <<"application/json">>). @@ -194,34 +196,14 @@ handle_response(Headers, Body) -> case safely_parse_body(ContentType, Body) of {ok, NBody} -> body_to_auth_data(NBody); - {error, Reason} -> - ?TRACE_AUTHN_PROVIDER( - error, - "parse_http_response_failed", - #{content_type => ContentType, body => Body, reason => Reason} - ), + {error, _Reason} -> ignore end. body_to_auth_data(Body) -> case maps:get(<<"result">>, Body, <<"ignore">>) of <<"allow">> -> - IsSuperuser = emqx_authn_utils:is_superuser(Body), - Attrs = emqx_authn_utils:client_attrs(Body), - try - ExpireAt = expire_at(Body), - ACL = acl(ExpireAt, Body), - Result = merge_maps([ExpireAt, IsSuperuser, ACL, Attrs]), - {ok, Result} - catch - throw:{bad_acl_rule, Reason} -> - %% it's a invalid token, so ok to log - ?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{http_body => Body}), - {error, bad_username_or_password}; - throw:Reason -> - ?TRACE_AUTHN_PROVIDER("bad_response_body", Reason#{http_body => Body}), - {error, bad_username_or_password} - end; + extract_auth_data(http, Body); <<"deny">> -> {error, not_authorized}; <<"ignore">> -> @@ -230,6 +212,24 @@ body_to_auth_data(Body) -> ignore end. +extract_auth_data(Source, Body) -> + IsSuperuser = emqx_authn_utils:is_superuser(Body), + Attrs = emqx_authn_utils:client_attrs(Body), + try + ExpireAt = expire_at(Body), + ACL = acl(ExpireAt, Source, Body), + Result = merge_maps([ExpireAt, IsSuperuser, ACL, Attrs]), + {ok, Result} + catch + throw:{bad_acl_rule, Reason} -> + %% it's a invalid token, so ok to log + ?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{http_body => Body}), + {error, bad_username_or_password}; + throw:Reason -> + ?TRACE_AUTHN_PROVIDER("bad_response_body", Reason#{http_body => Body}), + {error, bad_username_or_password} + end. + merge_maps([]) -> #{}; merge_maps([Map | Maps]) -> maps:merge(Map, merge_maps(Maps)). @@ -268,40 +268,43 @@ expire_sec(#{<<"expire_at">> := _}) -> expire_sec(_) -> undefined. -acl(#{expire_at := ExpireTimeMs}, #{<<"acl">> := Rules}) -> +acl(#{expire_at := ExpireTimeMs}, Source, #{<<"acl">> := Rules}) -> #{ acl => #{ - source_for_logging => http, + source_for_logging => Source, rules => emqx_authz_rule_raw:parse_and_compile_rules(Rules), %% It's seconds level precision (like JWT) for authz %% see emqx_authz_client_info:check/1 expire => erlang:convert_time_unit(ExpireTimeMs, millisecond, second) } }; -acl(_NoExpire, #{<<"acl">> := Rules}) -> +acl(_NoExpire, Source, #{<<"acl">> := Rules}) -> #{ acl => #{ - source_for_logging => http, + source_for_logging => Source, rules => emqx_authz_rule_raw:parse_and_compile_rules(Rules) } }; -acl(_, _) -> +acl(_, _, _) -> #{}. safely_parse_body(ContentType, Body) -> try parse_body(ContentType, Body) catch - _Class:_Reason -> + _Class:Reason -> + ?TRACE_AUTHN_PROVIDER( + error, + "parse_http_response_failed", + #{content_type => ContentType, body => Body, reason => Reason} + ), {error, invalid_body} end. parse_body(<<"application/json", _/binary>>, Body) -> {ok, emqx_utils_json:decode(Body, [return_maps])}; parse_body(<<"application/x-www-form-urlencoded", _/binary>>, Body) -> - Flags = [<<"result">>, <<"is_superuser">>], - RawMap = maps:from_list(cow_qs:parse_qs(Body)), - NBody = maps:with(Flags, RawMap), + NBody = maps:from_list(cow_qs:parse_qs(Body)), {ok, NBody}; parse_body(ContentType, _) -> {error, {unsupported_content_type, ContentType}}. diff --git a/apps/emqx_auth_http/src/emqx_authn_scram_http.erl b/apps/emqx_auth_http/src/emqx_authn_scram_restapi.erl similarity index 72% rename from apps/emqx_auth_http/src/emqx_authn_scram_http.erl rename to apps/emqx_auth_http/src/emqx_authn_scram_restapi.erl index 0e6190b4b9e..abb91f13052 100644 --- a/apps/emqx_auth_http/src/emqx_authn_scram_http.erl +++ b/apps/emqx_auth_http/src/emqx_authn_scram_restapi.erl @@ -2,10 +2,19 @@ %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_authn_scram_http). +%% Note: +%% This is not an implementation of the RFC 7804: +%% Salted Challenge Response HTTP Authentication Mechanism. +%% This backend is an implementation of scram, +%% which uses an external web resource as a source of user information. --include_lib("emqx_auth/include/emqx_authn.hrl"). +-module(emqx_authn_scram_restapi). + +-feature(maybe_expr, enable). + +-include("emqx_auth_http.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_auth/include/emqx_authn.hrl"). -behaviour(emqx_authn_provider). @@ -22,10 +31,6 @@ <<"salt">> ]). --define(OPTIONAL_USER_INFO_KEYS, [ - <<"is_superuser">> -]). - %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ @@ -72,7 +77,9 @@ authenticate( reason => Reason }) end, - emqx_utils_scram:authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, State); + emqx_utils_scram:authenticate( + AuthMethod, AuthData, AuthCache, State, RetrieveFun, OnErrFun, ?AUTHN_DATA_FIELDS + ); authenticate(_Credential, _State) -> ignore. @@ -95,7 +102,7 @@ retrieve( ) -> Request = emqx_authn_http:generate_request(Credential#{username := Username}, State), Response = emqx_resource:simple_sync_query(ResourceId, {Method, Request, RequestTimeout}), - ?TRACE_AUTHN_PROVIDER("scram_http_response", #{ + ?TRACE_AUTHN_PROVIDER("scram_restapi_response", #{ request => emqx_authn_http:request_for_log(Credential, State), response => emqx_authn_http:response_for_log(Response), resource => ResourceId @@ -113,16 +120,11 @@ retrieve( handle_response(Headers, Body) -> ContentType = proplists:get_value(<<"content-type">>, Headers), - case safely_parse_body(ContentType, Body) of - {ok, NBody} -> - body_to_user_info(NBody); - {error, Reason} = Error -> - ?TRACE_AUTHN_PROVIDER( - error, - "parse_scram_http_response_failed", - #{content_type => ContentType, body => Body, reason => Reason} - ), - Error + maybe + {ok, NBody} ?= emqx_authn_http:safely_parse_body(ContentType, Body), + {ok, UserInfo} ?= body_to_user_info(NBody), + {ok, AuthData} ?= emqx_authn_http:extract_auth_data(scram_restapi, NBody), + {ok, maps:merge(AuthData, UserInfo)} end. body_to_user_info(Body) -> @@ -131,26 +133,16 @@ body_to_user_info(Body) -> true -> case safely_convert_hex(Required0) of {ok, Required} -> - UserInfo0 = maps:merge(Required, maps:with(?OPTIONAL_USER_INFO_KEYS, Body)), - UserInfo1 = emqx_utils_maps:safe_atom_key_map(UserInfo0), - UserInfo = maps:merge(#{is_superuser => false}, UserInfo1), - {ok, UserInfo}; + {ok, emqx_utils_maps:safe_atom_key_map(Required)}; Error -> + ?TRACE_AUTHN_PROVIDER("decode_keys_failed", #{http_body => Body}), Error end; _ -> - ?TRACE_AUTHN_PROVIDER("bad_response_body", #{http_body => Body}), + ?TRACE_AUTHN_PROVIDER("missing_requried_keys", #{http_body => Body}), {error, bad_response} end. -safely_parse_body(ContentType, Body) -> - try - parse_body(ContentType, Body) - catch - _Class:_Reason -> - {error, invalid_body} - end. - safely_convert_hex(Required) -> try {ok, @@ -165,15 +157,5 @@ safely_convert_hex(Required) -> {error, Reason} end. -parse_body(<<"application/json", _/binary>>, Body) -> - {ok, emqx_utils_json:decode(Body, [return_maps])}; -parse_body(<<"application/x-www-form-urlencoded", _/binary>>, Body) -> - Flags = ?REQUIRED_USER_INFO_KEYS ++ ?OPTIONAL_USER_INFO_KEYS, - RawMap = maps:from_list(cow_qs:parse_qs(Body)), - NBody = maps:with(Flags, RawMap), - {ok, NBody}; -parse_body(ContentType, _) -> - {error, {unsupported_content_type, ContentType}}. - merge_scram_conf(Conf, State) -> maps:merge(maps:with([algorithm, iteration_count], Conf), State). diff --git a/apps/emqx_auth_http/src/emqx_authn_scram_http_schema.erl b/apps/emqx_auth_http/src/emqx_authn_scram_restapi_schema.erl similarity index 88% rename from apps/emqx_auth_http/src/emqx_authn_scram_http_schema.erl rename to apps/emqx_auth_http/src/emqx_authn_scram_restapi_schema.erl index ca43fe3a6ca..bf3398abb5e 100644 --- a/apps/emqx_auth_http/src/emqx_authn_scram_http_schema.erl +++ b/apps/emqx_auth_http/src/emqx_authn_scram_restapi_schema.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_authn_scram_http_schema). +-module(emqx_authn_scram_restapi_schema). -behaviour(emqx_authn_schema). @@ -22,16 +22,16 @@ namespace() -> "authn". refs() -> - [?R_REF(scram_http_get), ?R_REF(scram_http_post)]. + [?R_REF(scram_restapi_get), ?R_REF(scram_restapi_post)]. select_union_member( #{<<"mechanism">> := ?AUTHN_MECHANISM_SCRAM_BIN, <<"backend">> := ?AUTHN_BACKEND_BIN} = Value ) -> case maps:get(<<"method">>, Value, undefined) of <<"get">> -> - [?R_REF(scram_http_get)]; + [?R_REF(scram_restapi_get)]; <<"post">> -> - [?R_REF(scramm_http_post)]; + [?R_REF(scram_restapi_post)]; Else -> throw(#{ reason => "unknown_http_method", @@ -43,20 +43,20 @@ select_union_member( select_union_member(_Value) -> undefined. -fields(scram_http_get) -> +fields(scram_restapi_get) -> [ {method, #{type => get, required => true, desc => ?DESC(emqx_authn_http_schema, method)}}, {headers, fun emqx_authn_http_schema:headers_no_content_type/1} ] ++ common_fields(); -fields(scram_http_post) -> +fields(scram_restapi_post) -> [ {method, #{type => post, required => true, desc => ?DESC(emqx_authn_http_schema, method)}}, {headers, fun emqx_authn_http_schema:headers/1} ] ++ common_fields(). -desc(scram_http_get) -> +desc(scram_restapi_get) -> ?DESC(emqx_authn_http_schema, get); -desc(scram_http_post) -> +desc(scram_restapi_post) -> ?DESC(emqx_authn_http_schema, post); desc(_) -> undefined. diff --git a/apps/emqx_auth_http/test/emqx_authn_scram_http_SUITE.erl b/apps/emqx_auth_http/test/emqx_authn_scram_restapi_SUITE.erl similarity index 81% rename from apps/emqx_auth_http/test/emqx_authn_scram_http_SUITE.erl rename to apps/emqx_auth_http/test/emqx_authn_scram_restapi_SUITE.erl index b00212cb1c2..7963cf1e38b 100644 --- a/apps/emqx_auth_http/test/emqx_authn_scram_http_SUITE.erl +++ b/apps/emqx_auth_http/test/emqx_authn_scram_restapi_SUITE.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_authn_scram_http_SUITE). +-module(emqx_authn_scram_restapi_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -21,6 +21,9 @@ -define(ALGORITHM_STR, <<"sha512">>). -define(ITERATION_COUNT, 4096). +-define(T_ACL_USERNAME, <<"username">>). +-define(T_ACL_PASSWORD, <<"password">>). + -include_lib("emqx/include/emqx_placeholder.hrl"). all() -> @@ -54,11 +57,11 @@ init_per_testcase(_Case, Config) -> [authentication], ?GLOBAL ), - {ok, _} = emqx_authn_scram_http_test_server:start_link(?HTTP_PORT, ?HTTP_PATH), + {ok, _} = emqx_authn_scram_restapi_test_server:start_link(?HTTP_PORT, ?HTTP_PATH), Config. end_per_testcase(_Case, _Config) -> - ok = emqx_authn_scram_http_test_server:stop(). + ok = emqx_authn_scram_restapi_test_server:stop(). %%------------------------------------------------------------------------------ %% Tests @@ -72,7 +75,9 @@ t_create(_Config) -> {create_authenticator, ?GLOBAL, AuthConfig} ), - {ok, [#{provider := emqx_authn_scram_http}]} = emqx_authn_chains:list_authenticators(?GLOBAL). + {ok, [#{provider := emqx_authn_scram_restapi}]} = emqx_authn_chains:list_authenticators( + ?GLOBAL + ). t_create_invalid(_Config) -> AuthConfig = raw_config(), @@ -118,59 +123,8 @@ t_authenticate(_Config) -> ok = emqx_config:put([mqtt, idle_timeout], 500), - {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883), - - ClientFirstMessage = esasl_scram:client_first_message(Username), - - ConnectPacket = ?CONNECT_PACKET( - #mqtt_packet_connect{ - proto_ver = ?MQTT_PROTO_V5, - properties = #{ - 'Authentication-Method' => <<"SCRAM-SHA-512">>, - 'Authentication-Data' => ClientFirstMessage - } - } - ), - - ok = emqx_authn_mqtt_test_client:send(Pid, ConnectPacket), - - %% Intentional sleep to trigger idle timeout for the connection not yet authenticated - ok = ct:sleep(1000), - - ?AUTH_PACKET( - ?RC_CONTINUE_AUTHENTICATION, - #{'Authentication-Data' := ServerFirstMessage} - ) = receive_packet(), - - {continue, ClientFinalMessage, ClientCache} = - esasl_scram:check_server_first_message( - ServerFirstMessage, - #{ - client_first_message => ClientFirstMessage, - password => Password, - algorithm => ?ALGORITHM - } - ), - - AuthContinuePacket = ?AUTH_PACKET( - ?RC_CONTINUE_AUTHENTICATION, - #{ - 'Authentication-Method' => <<"SCRAM-SHA-512">>, - 'Authentication-Data' => ClientFinalMessage - } - ), - - ok = emqx_authn_mqtt_test_client:send(Pid, AuthContinuePacket), - - ?CONNACK_PACKET( - ?RC_SUCCESS, - _, - #{'Authentication-Data' := ServerFinalMessage} - ) = receive_packet(), - - ok = esasl_scram:check_server_final_message( - ServerFinalMessage, ClientCache#{algorithm => ?ALGORITHM} - ). + {ok, Pid} = create_connection(Username, Password), + emqx_authn_mqtt_test_client:stop(Pid). t_authenticate_bad_props(_Config) -> Username = <<"u">>, @@ -314,6 +268,47 @@ t_destroy(_Config) -> _ ) = receive_packet(). +t_acl(_Config) -> + init_auth(), + + ACL = emqx_authn_http_SUITE:acl_rules(), + set_user_handler(?T_ACL_USERNAME, ?T_ACL_PASSWORD, #{acl => ACL}), + {ok, Pid} = create_connection(?T_ACL_USERNAME, ?T_ACL_PASSWORD), + + Cases = [ + {allow, <<"http-authn-acl/#">>}, + {deny, <<"http-authn-acl/1">>}, + {deny, <<"t/#">>} + ], + + try + lists:foreach( + fun(Case) -> + test_acl(Case, Pid) + end, + Cases + ) + after + ok = emqx_authn_mqtt_test_client:stop(Pid) + end. + +t_auth_expire(_Config) -> + init_auth(), + + ExpireSec = 3, + WaitTime = timer:seconds(ExpireSec + 1), + ACL = emqx_authn_http_SUITE:acl_rules(), + + set_user_handler(?T_ACL_USERNAME, ?T_ACL_PASSWORD, #{ + acl => ACL, + expire_at => + erlang:system_time(second) + ExpireSec + }), + {ok, Pid} = create_connection(?T_ACL_USERNAME, ?T_ACL_PASSWORD), + + timer:sleep(WaitTime), + ?assertEqual(false, erlang:is_process_alive(Pid)). + t_is_superuser() -> State = init_auth(), ok = test_is_superuser(State, false), @@ -324,12 +319,12 @@ test_is_superuser(State, ExpectedIsSuperuser) -> Username = <<"u">>, Password = <<"p">>, - set_user_handler(Username, Password, ExpectedIsSuperuser), + set_user_handler(Username, Password, #{is_superuser => ExpectedIsSuperuser}), ClientFirstMessage = esasl_scram:client_first_message(Username), {continue, ServerFirstMessage, ServerCache} = - emqx_authn_scram_http:authenticate( + emqx_authn_scram_restapi:authenticate( #{ auth_method => <<"SCRAM-SHA-512">>, auth_data => ClientFirstMessage, @@ -349,7 +344,7 @@ test_is_superuser(State, ExpectedIsSuperuser) -> ), {ok, UserInfo1, ServerFinalMessage} = - emqx_authn_scram_http:authenticate( + emqx_authn_scram_restapi:authenticate( #{ auth_method => <<"SCRAM-SHA-512">>, auth_data => ClientFinalMessage, @@ -382,24 +377,25 @@ raw_config() -> }. set_user_handler(Username, Password) -> - set_user_handler(Username, Password, false). -set_user_handler(Username, Password, IsSuperuser) -> + set_user_handler(Username, Password, #{is_superuser => false}). +set_user_handler(Username, Password, Extra0) -> %% HTTP Server Handler = fun(Req0, State) -> #{ username := Username } = cowboy_req:match_qs([username], Req0), - UserInfo = make_user_info(Password, ?ALGORITHM, ?ITERATION_COUNT, IsSuperuser), + UserInfo = make_user_info(Password, ?ALGORITHM, ?ITERATION_COUNT), + Extra = maps:merge(#{is_superuser => false}, Extra0), Req = cowboy_req:reply( 200, #{<<"content-type">> => <<"application/json">>}, - emqx_utils_json:encode(UserInfo), + emqx_utils_json:encode(maps:merge(Extra, UserInfo)), Req0 ), {ok, Req, State} end, - ok = emqx_authn_scram_http_test_server:set_handler(Handler). + ok = emqx_authn_scram_restapi_test_server:set_handler(Handler). init_auth() -> init_auth(raw_config()). @@ -413,7 +409,7 @@ init_auth(Config) -> {ok, [#{state := State}]} = emqx_authn_chains:list_authenticators(?GLOBAL), State. -make_user_info(Password, Algorithm, IterationCount, IsSuperuser) -> +make_user_info(Password, Algorithm, IterationCount) -> {StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info( Password, #{ @@ -424,8 +420,7 @@ make_user_info(Password, Algorithm, IterationCount, IsSuperuser) -> #{ stored_key => binary:encode_hex(StoredKey), server_key => binary:encode_hex(ServerKey), - salt => binary:encode_hex(Salt), - is_superuser => IsSuperuser + salt => binary:encode_hex(Salt) }. receive_packet() -> @@ -436,3 +431,79 @@ receive_packet() -> after 1000 -> ct:fail("Deliver timeout") end. + +create_connection(Username, Password) -> + {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883), + + ClientFirstMessage = esasl_scram:client_first_message(Username), + + ConnectPacket = ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + properties = #{ + 'Authentication-Method' => <<"SCRAM-SHA-512">>, + 'Authentication-Data' => ClientFirstMessage + } + } + ), + + ok = emqx_authn_mqtt_test_client:send(Pid, ConnectPacket), + + %% Intentional sleep to trigger idle timeout for the connection not yet authenticated + ok = ct:sleep(1000), + + ?AUTH_PACKET( + ?RC_CONTINUE_AUTHENTICATION, + #{'Authentication-Data' := ServerFirstMessage} + ) = receive_packet(), + + {continue, ClientFinalMessage, ClientCache} = + esasl_scram:check_server_first_message( + ServerFirstMessage, + #{ + client_first_message => ClientFirstMessage, + password => Password, + algorithm => ?ALGORITHM + } + ), + + AuthContinuePacket = ?AUTH_PACKET( + ?RC_CONTINUE_AUTHENTICATION, + #{ + 'Authentication-Method' => <<"SCRAM-SHA-512">>, + 'Authentication-Data' => ClientFinalMessage + } + ), + + ok = emqx_authn_mqtt_test_client:send(Pid, AuthContinuePacket), + + ?CONNACK_PACKET( + ?RC_SUCCESS, + _, + #{'Authentication-Data' := ServerFinalMessage} + ) = receive_packet(), + + ok = esasl_scram:check_server_final_message( + ServerFinalMessage, ClientCache#{algorithm => ?ALGORITHM} + ), + {ok, Pid}. + +test_acl({allow, Topic}, C) -> + ?assertMatch( + [0], + send_subscribe(C, Topic) + ); +test_acl({deny, Topic}, C) -> + ?assertMatch( + [?RC_NOT_AUTHORIZED], + send_subscribe(C, Topic) + ). + +send_subscribe(Client, Topic) -> + TopicOpts = #{nl => 0, rap => 0, rh => 0, qos => 0}, + Packet = ?SUBSCRIBE_PACKET(1, [{Topic, TopicOpts}]), + emqx_authn_mqtt_test_client:send(Client, Packet), + timer:sleep(200), + + ?SUBACK_PACKET(1, ReasonCode) = receive_packet(), + ReasonCode. diff --git a/apps/emqx_auth_http/test/emqx_authn_scram_http_test_server.erl b/apps/emqx_auth_http/test/emqx_authn_scram_restapi_test_server.erl similarity index 98% rename from apps/emqx_auth_http/test/emqx_authn_scram_http_test_server.erl rename to apps/emqx_auth_http/test/emqx_authn_scram_restapi_test_server.erl index 5467df6217d..1e1432e0bb8 100644 --- a/apps/emqx_auth_http/test/emqx_authn_scram_http_test_server.erl +++ b/apps/emqx_auth_http/test/emqx_authn_scram_restapi_test_server.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_authn_scram_http_test_server). +-module(emqx_authn_scram_restapi_test_server). -behaviour(supervisor). -behaviour(cowboy_handler). diff --git a/apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia.erl b/apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia.erl index d59afea2857..9880b71ee50 100644 --- a/apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia.erl +++ b/apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia.erl @@ -141,7 +141,9 @@ authenticate( reason => Reason }) end, - emqx_utils_scram:authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, State); + emqx_utils_scram:authenticate( + AuthMethod, AuthData, AuthCache, State, RetrieveFun, OnErrFun, [is_superuser] + ); authenticate(_Credential, _State) -> ignore. diff --git a/apps/emqx_bridge_azure_event_hub/mix.exs b/apps/emqx_bridge_azure_event_hub/mix.exs index 42edddbbe87..8f5068d0ea3 100644 --- a/apps/emqx_bridge_azure_event_hub/mix.exs +++ b/apps/emqx_bridge_azure_event_hub/mix.exs @@ -23,7 +23,7 @@ defmodule EMQXBridgeAzureEventHub.MixProject do def deps() do [ - {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"}, + {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"}, diff --git a/apps/emqx_bridge_azure_event_hub/rebar.config b/apps/emqx_bridge_azure_event_hub/rebar.config index 76ea7fa6c8c..c8be2a6a353 100644 --- a/apps/emqx_bridge_azure_event_hub/rebar.config +++ b/apps/emqx_bridge_azure_event_hub/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl index 661b8819cdf..f2a06cf65e5 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl @@ -382,12 +382,31 @@ t_multiple_actions_sharing_topic(Config) -> ActionConfig0, #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic( - [ - {type, ?BRIDGE_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), + ok = + emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME( + [ + {type, ?BRIDGE_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), + ok. + +t_dynamic_topics(Config) -> + ActionConfig0 = ?config(action_config, Config), + ActionConfig = + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ), + ok = + emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME( + [ + {type, ?BRIDGE_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), ok. diff --git a/apps/emqx_bridge_confluent/mix.exs b/apps/emqx_bridge_confluent/mix.exs index 46cbe9a023d..134e924fcb8 100644 --- a/apps/emqx_bridge_confluent/mix.exs +++ b/apps/emqx_bridge_confluent/mix.exs @@ -23,7 +23,7 @@ defmodule EMQXBridgeConfluent.MixProject do def deps() do [ - {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"}, + {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"}, diff --git a/apps/emqx_bridge_confluent/rebar.config b/apps/emqx_bridge_confluent/rebar.config index 1a91f501d07..786b1cf82a1 100644 --- a/apps/emqx_bridge_confluent/rebar.config +++ b/apps/emqx_bridge_confluent/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl index 0b3a22a992b..f10e88463f8 100644 --- a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl +++ b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl @@ -391,12 +391,31 @@ t_multiple_actions_sharing_topic(Config) -> ActionConfig0, #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic( - [ - {type, ?ACTION_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), + ok = + emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME( + [ + {type, ?ACTION_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), + ok. + +t_dynamic_topics(Config) -> + ActionConfig0 = ?config(action_config, Config), + ActionConfig = + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ), + ok = + emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME( + [ + {type, ?ACTION_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), ok. diff --git a/apps/emqx_bridge_kafka/mix.exs b/apps/emqx_bridge_kafka/mix.exs index b74b1fdd0dd..a1a59cb08cf 100644 --- a/apps/emqx_bridge_kafka/mix.exs +++ b/apps/emqx_bridge_kafka/mix.exs @@ -23,7 +23,7 @@ defmodule EMQXBridgeKafka.MixProject do def deps() do [ - {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"}, + {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"}, diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index b89c9190f29..77d9b95ef18 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 212d288ba90..377f07c59eb 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -295,6 +295,7 @@ fields("config_producer") -> fields("config_consumer") -> fields(kafka_consumer); fields(kafka_producer) -> + %% Schema used by bridges V1. connector_config_fields() ++ producer_opts(v1); fields(kafka_producer_action) -> emqx_bridge_v2_schema:common_fields() ++ producer_opts(action); @@ -356,9 +357,33 @@ fields(socket_opts) -> validator => fun emqx_schema:validate_tcp_keepalive/1 })} ]; +fields(v1_producer_kafka_opts) -> + OldSchemaFields = + [ + topic, + message, + max_batch_bytes, + compression, + partition_strategy, + required_acks, + kafka_headers, + kafka_ext_headers, + kafka_header_value_encode_mode, + partition_count_refresh_interval, + partitions_limit, + max_inflight, + buffer, + query_mode, + sync_query_timeout + ], + Fields = fields(producer_kafka_opts), + lists:filter( + fun({K, _V}) -> lists:member(K, OldSchemaFields) end, + Fields + ); fields(producer_kafka_opts) -> [ - {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}, + {topic, mk(emqx_schema:template(), #{required => true, desc => ?DESC(kafka_topic)})}, {message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})}, {max_batch_bytes, mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})}, @@ -672,15 +697,15 @@ resource_opts() -> %% However we need to keep it backward compatible for generated schema json (version 0.1.0) %% since schema is data for the 'schemas' API. parameters_field(ActionOrBridgeV1) -> - {Name, Alias} = + {Name, Alias, Ref} = case ActionOrBridgeV1 of v1 -> - {kafka, parameters}; + {kafka, parameters, v1_producer_kafka_opts}; action -> - {parameters, kafka} + {parameters, kafka, producer_kafka_opts} end, {Name, - mk(ref(producer_kafka_opts), #{ + mk(ref(Ref), #{ required => true, aliases => [Alias], desc => ?DESC(producer_kafka_opts), diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 42941868d0c..1b18a1767a8 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_bridge_kafka_impl_producer). +-feature(maybe_expr, enable). + -behaviour(emqx_resource). -include_lib("emqx_resource/include/emqx_resource.hrl"). @@ -125,8 +127,8 @@ on_add_channel( {ok, NewState}. create_producers_for_bridge_v2( - InstId, - BridgeV2Id, + ConnResId, + ActionResId, ClientId, #{ bridge_type := BridgeType, @@ -135,33 +137,42 @@ create_producers_for_bridge_v2( ) -> #{ message := MessageTemplate, - topic := KafkaTopic, + topic := KafkaTopic0, sync_query_timeout := SyncQueryTimeout } = KafkaConfig, + TopicTemplate = {TopicType, TopicOrTemplate} = maybe_preproc_topic(KafkaTopic0), + MKafkaTopic = + case TopicType of + fixed -> TopicOrTemplate; + dynamic -> dynamic + end, KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions), - #{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id), - IsDryRun = emqx_resource:is_dry_run(BridgeV2Id), - ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions), + #{name := BridgeName} = emqx_bridge_v2:parse_id(ActionResId), + IsDryRun = emqx_resource:is_dry_run(ActionResId), + ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions), WolffProducerConfig = producers_config( - BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id + BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId ), - case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of + case wolff:ensure_supervised_dynamic_producers(ClientId, WolffProducerConfig) of {ok, Producers} -> - ok = emqx_resource:allocate_resource(InstId, {?kafka_producers, BridgeV2Id}, Producers), ok = emqx_resource:allocate_resource( - InstId, {?kafka_telemetry_id, BridgeV2Id}, BridgeV2Id + ConnResId, {?kafka_producers, ActionResId}, Producers + ), + ok = emqx_resource:allocate_resource( + ConnResId, {?kafka_telemetry_id, ActionResId}, ActionResId ), - _ = maybe_install_wolff_telemetry_handlers(BridgeV2Id), + _ = maybe_install_wolff_telemetry_handlers(ActionResId), {ok, #{ message_template => compile_message_template(MessageTemplate), kafka_client_id => ClientId, - kafka_topic => KafkaTopic, + topic_template => TopicTemplate, + topic => MKafkaTopic, producers => Producers, - resource_id => BridgeV2Id, - connector_resource_id => InstId, + resource_id => ActionResId, + connector_resource_id => ConnResId, sync_query_timeout => SyncQueryTimeout, kafka_config => KafkaConfig, headers_tokens => KafkaHeadersTokens, @@ -172,9 +183,9 @@ create_producers_for_bridge_v2( {error, Reason2} -> ?SLOG(error, #{ msg => "failed_to_start_kafka_producer", - instance_id => InstId, + instance_id => ConnResId, kafka_client_id => ClientId, - kafka_topic => KafkaTopic, + kafka_topic => MKafkaTopic, reason => Reason2 }), throw( @@ -267,7 +278,9 @@ remove_producers_for_bridge_v2( ClientId = maps:get(?kafka_client_id, AllocatedResources, no_client_id), maps:foreach( fun - ({?kafka_producers, BridgeV2IdCheck}, Producers) when BridgeV2IdCheck =:= BridgeV2Id -> + ({?kafka_producers, BridgeV2IdCheck}, Producers) when + BridgeV2IdCheck =:= BridgeV2Id + -> deallocate_producers(ClientId, Producers); ({?kafka_telemetry_id, BridgeV2IdCheck}, TelemetryId) when BridgeV2IdCheck =:= BridgeV2Id @@ -300,7 +313,8 @@ on_query( #{installed_bridge_v2s := BridgeV2Configs} = _ConnectorState ) -> #{ - message_template := Template, + message_template := MessageTemplate, + topic_template := TopicTemplate, producers := Producers, sync_query_timeout := SyncTimeout, headers_tokens := KafkaHeadersTokens, @@ -313,7 +327,8 @@ on_query( headers_val_encode_mode => KafkaHeadersValEncodeMode }, try - KafkaMessage = render_message(Template, KafkaHeaders, Message), + KafkaTopic = render_topic(TopicTemplate, Message), + KafkaMessage = render_message(MessageTemplate, KafkaHeaders, Message), ?tp( emqx_bridge_kafka_impl_producer_sync_query, #{headers_config => KafkaHeaders, instance_id => InstId} @@ -321,9 +336,15 @@ on_query( emqx_trace:rendered_action_template(MessageTag, #{ message => KafkaMessage }), - do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) + do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout) catch - error:{invalid_partition_count, Count, _Partitioner} -> + throw:bad_topic -> + ?tp("kafka_producer_failed_to_render_topic", #{}), + {error, {unrecoverable_error, failed_to_render_topic}}; + throw:#{cause := unknown_topic_or_partition, topic := Topic} -> + ?tp("kafka_producer_resolved_to_unknown_topic", #{}), + {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}}; + throw:#{cause := invalid_partition_count, count := Count} -> ?tp("kafka_producer_invalid_partition_count", #{ action_id => MessageTag, query_mode => sync @@ -368,6 +389,7 @@ on_query_async( ) -> #{ message_template := Template, + topic_template := TopicTemplate, producers := Producers, headers_tokens := KafkaHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens, @@ -379,6 +401,7 @@ on_query_async( headers_val_encode_mode => KafkaHeadersValEncodeMode }, try + KafkaTopic = render_topic(TopicTemplate, Message), KafkaMessage = render_message(Template, KafkaHeaders, Message), ?tp( emqx_bridge_kafka_impl_producer_async_query, @@ -387,9 +410,15 @@ on_query_async( emqx_trace:rendered_action_template(MessageTag, #{ message => KafkaMessage }), - do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) + do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn) catch - error:{invalid_partition_count, Count, _Partitioner} -> + throw:bad_topic -> + ?tp("kafka_producer_failed_to_render_topic", #{}), + {error, {unrecoverable_error, failed_to_render_topic}}; + throw:#{cause := unknown_topic_or_partition, topic := Topic} -> + ?tp("kafka_producer_resolved_to_unknown_topic", #{}), + {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}}; + throw:#{cause := invalid_partition_count, count := Count} -> ?tp("kafka_producer_invalid_partition_count", #{ action_id => MessageTag, query_mode => async @@ -427,9 +456,28 @@ compile_message_template(T) -> timestamp => preproc_tmpl(TimestampTemplate) }. +maybe_preproc_topic(Topic) -> + Template = emqx_template:parse(Topic), + case emqx_template:placeholders(Template) of + [] -> + {fixed, bin(Topic)}; + [_ | _] -> + {dynamic, Template} + end. + preproc_tmpl(Tmpl) -> emqx_placeholder:preproc_tmpl(Tmpl). +render_topic({fixed, KafkaTopic}, _Message) -> + KafkaTopic; +render_topic({dynamic, Template}, Message) -> + try + iolist_to_binary(emqx_template:render_strict(Template, {emqx_jsonish, Message})) + catch + error:_Errors -> + throw(bad_topic) + end. + render_message( #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, #{ @@ -471,9 +519,11 @@ render_timestamp(Template, Message) -> erlang:system_time(millisecond) end. -do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) -> +do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout) -> try - {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout), + {_Partition, _Offset} = wolff:send_sync2( + Producers, KafkaTopic, [KafkaMessage], SyncTimeout + ), ok catch error:{producer_down, _} = Reason -> @@ -481,7 +531,7 @@ do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) -> error:timeout -> {error, timeout} end; -do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) -> +do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn) -> %% * Must be a batch because wolff:send and wolff:send_sync are batch APIs %% * Must be a single element batch because wolff books calls, but not batch sizes %% for counters and gauges. @@ -489,7 +539,9 @@ do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) -> %% The retuned information is discarded here. %% If the producer process is down when sending, this function would %% raise an error exception which is to be caught by the caller of this callback - {_Partition, Pid} = wolff:send(Producers, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}), + {_Partition, Pid} = wolff:send2( + Producers, KafkaTopic, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]} + ), %% this Pid is so far never used because Kafka producer is by-passing the buffer worker {ok, Pid}. @@ -512,7 +564,7 @@ on_kafka_ack(_Partition, message_too_large, {ReplyFn, Args}) -> %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise, %% `emqx_resource_manager' will kill the wolff producers and messages might be lost. on_get_status( - _InstId, + ConnResId, #{client_id := ClientId} = State ) -> %% Note: we must avoid returning `?status_disconnected' here if the connector ever was @@ -522,7 +574,7 @@ on_get_status( %% held in wolff producer's replayq. case check_client_connectivity(ClientId) of ok -> - maybe_check_health_check_topic(State); + maybe_check_health_check_topic(ConnResId, State); {error, {find_client, _Error}} -> ?status_connecting; {error, {connectivity, Error}} -> @@ -530,20 +582,23 @@ on_get_status( end. on_get_channel_status( - _ResId, - ChannelId, + _ConnResId, + ActionResId, #{ client_id := ClientId, installed_bridge_v2s := Channels - } = _State + } = _ConnState ) -> %% Note: we must avoid returning `?status_disconnected' here. Returning %% `?status_disconnected' will make resource manager try to restart the producers / %% connector, thus potentially dropping data held in wolff producer's replayq. The %% only exception is if the topic does not exist ("unhealthy target"). - #{kafka_topic := KafkaTopic, partitions_limit := MaxPartitions} = maps:get(ChannelId, Channels), + #{ + topic := MKafkaTopic, + partitions_limit := MaxPartitions + } = maps:get(ActionResId, Channels), try - ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions), + ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions), ?status_connected catch throw:{unhealthy_target, Msg} -> @@ -552,22 +607,29 @@ on_get_channel_status( {?status_connecting, {K, E}} end. -check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) -> +check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions) -> case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> - ok = check_topic_status(ClientId, Pid, KafkaTopic), - ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions); + maybe + true ?= is_binary(MKafkaTopic), + ok = check_topic_status(ClientId, Pid, MKafkaTopic), + ok = check_if_healthy_leaders( + ActionResId, ClientId, Pid, MKafkaTopic, MaxPartitions + ) + else + false -> ok + end; {error, #{reason := no_such_client}} -> throw(#{ reason => cannot_find_kafka_client, kafka_client => ClientId, - kafka_topic => KafkaTopic + kafka_topic => MKafkaTopic }); {error, #{reason := client_supervisor_not_initialized}} -> throw(#{ reason => restarting, kafka_client => ClientId, - kafka_topic => KafkaTopic + kafka_topic => MKafkaTopic }) end. @@ -586,21 +648,23 @@ check_client_connectivity(ClientId) -> {error, {find_client, Reason}} end. -maybe_check_health_check_topic(#{health_check_topic := Topic} = ConnectorState) when +maybe_check_health_check_topic(ConnResId, #{health_check_topic := Topic} = ConnectorState) when is_binary(Topic) -> #{client_id := ClientId} = ConnectorState, MaxPartitions = all_partitions, - try check_topic_and_leader_connections(ClientId, Topic, MaxPartitions) of + try check_topic_and_leader_connections(ConnResId, ClientId, Topic, MaxPartitions) of ok -> ?status_connected catch + throw:{unhealthy_target, Msg} -> + {?status_disconnected, ConnectorState, Msg}; throw:#{reason := {connection_down, _} = Reason} -> {?status_disconnected, ConnectorState, Reason}; throw:#{reason := Reason} -> {?status_connecting, ConnectorState, Reason} end; -maybe_check_health_check_topic(_) -> +maybe_check_health_check_topic(_ConnResId, _ConnState) -> %% Cannot infer further information. Maybe upgraded from older version. ?status_connected. @@ -612,8 +676,10 @@ error_summary(Map, [Error]) -> error_summary(Map, [Error | More]) -> Map#{first_error => Error, total_errors => length(More) + 1}. -check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) -> - case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of +check_if_healthy_leaders(ActionResId, ClientId, ClientPid, KafkaTopic, MaxPartitions) when + is_pid(ClientPid) +-> + case wolff_client:get_leader_connections(ClientPid, ActionResId, KafkaTopic, MaxPartitions) of {ok, Leaders} -> %% Kafka is considered healthy as long as any of the partition leader is reachable. case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of @@ -675,7 +741,7 @@ ssl(#{enable := true} = SSL) -> ssl(_) -> false. -producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) -> +producers_config(BridgeType, BridgeName, Input, IsDryRun, ActionResId) -> #{ max_batch_bytes := MaxBatchBytes, compression := Compression, @@ -717,8 +783,8 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) -> max_batch_bytes => MaxBatchBytes, max_send_ahead => MaxInflight - 1, compression => Compression, - alias => BridgeV2Id, - telemetry_meta_data => #{bridge_id => BridgeV2Id}, + group => ActionResId, + telemetry_meta_data => #{bridge_id => ActionResId}, max_partitions => MaxPartitions }. @@ -794,20 +860,19 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) -> %% Note: don't use the instance/manager ID, as that changes everytime %% the bridge is recreated, and will lead to multiplication of %% metrics. --spec telemetry_handler_id(resource_id()) -> binary(). -telemetry_handler_id(ResourceID) -> - <<"emqx-bridge-kafka-producer-", ResourceID/binary>>. +-spec telemetry_handler_id(action_resource_id()) -> binary(). +telemetry_handler_id(ActionResId) -> + ActionResId. -uninstall_telemetry_handlers(ResourceID) -> - HandlerID = telemetry_handler_id(ResourceID), - telemetry:detach(HandlerID). +uninstall_telemetry_handlers(TelemetryId) -> + telemetry:detach(TelemetryId). -maybe_install_wolff_telemetry_handlers(ResourceID) -> +maybe_install_wolff_telemetry_handlers(TelemetryId) -> %% Attach event handlers for Kafka telemetry events. If a handler with the %% handler id already exists, the attach_many function does nothing telemetry:attach_many( %% unique handler id - telemetry_handler_id(ResourceID), + telemetry_handler_id(TelemetryId), [ [wolff, dropped_queue_full], [wolff, queuing], @@ -819,7 +884,7 @@ maybe_install_wolff_telemetry_handlers(ResourceID) -> %% wolff producers; otherwise, multiple kafka producer bridges %% will install multiple handlers to the same wolff events, %% multiplying the metric counts... - #{bridge_id => ResourceID} + #{bridge_id => TelemetryId} ). preproc_kafka_headers(HeadersTmpl) when HeadersTmpl =:= <<>>; HeadersTmpl =:= undefined -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl index d97e68ba616..b9e13e7176a 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl @@ -26,7 +26,12 @@ schema_module() -> emqx_bridge_kafka. connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig), BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1), - emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2). + BridgeV1Config = emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2), + maps:update_with( + <<"kafka">>, + fun(Params) -> maps:with(v1_parameters(), Params) end, + BridgeV1Config + ). bridge_v1_config_to_action_config(BridgeV1Conf0 = #{<<"producer">> := _}, ConnectorName) -> %% Ancient v1 config, when `kafka' key was wrapped by `producer' @@ -51,6 +56,12 @@ bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) -> %% Internal helper functions %%------------------------------------------------------------------------------------------ +v1_parameters() -> + [ + to_bin(K) + || {K, _} <- emqx_bridge_kafka:fields(v1_producer_kafka_opts) + ]. + producer_action_field_keys() -> [ to_bin(K) diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 24405eb9e26..74d3a5f546c 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -477,7 +477,7 @@ do_start_producer(KafkaClientId, KafkaTopic) -> ProducerConfig = #{ name => Name, - partitioner => roundrobin, + partitioner => random, partition_count_refresh_interval_seconds => 1_000, replayq_max_total_bytes => 10_000, replayq_seg_bytes => 9_000, @@ -1520,7 +1520,7 @@ t_receive_after_recovery(Config) -> key => <<"commit", (integer_to_binary(N))/binary>>, value => <<"commit", (integer_to_binary(N))/binary>> } - || N <- lists:seq(1, NPartitions) + || N <- lists:seq(1, NPartitions * 10) ], %% we do distinct passes over this producing part so that %% wolff won't batch everything together. @@ -1933,7 +1933,7 @@ t_node_joins_existing_cluster(Config) -> Val = <<"v", (integer_to_binary(N))/binary>>, publish(Config, KafkaTopic, [#{key => Key, value => Val}]) end, - lists:seq(1, NPartitions) + lists:seq(1, 10 * NPartitions) ), {ok, _} = snabbkaffe:receive_events(SRef1), diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 98260a2c2b4..1db3c17252a 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("brod/include/brod.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("emqx/include/asserts.hrl"). -import(emqx_common_test_helpers, [on_exit/1]). @@ -165,6 +166,9 @@ send_message(Type, ActionName) -> resolve_kafka_offset() -> KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + resolve_kafka_offset(KafkaTopic). + +resolve_kafka_offset(KafkaTopic) -> Partition = 0, Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset( @@ -174,11 +178,32 @@ resolve_kafka_offset() -> check_kafka_message_payload(Offset, ExpectedPayload) -> KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload). + +check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload) -> Partition = 0, Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset), ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0). +ensure_kafka_topic(KafkaTopic) -> + TopicConfigs = [ + #{ + name => KafkaTopic, + num_partitions => 1, + replication_factor => 1, + assignments => [], + configs => [] + } + ], + RequestConfig = #{timeout => 5_000}, + ConnConfig = #{}, + Endpoints = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), + case brod:create_topics(Endpoints, TopicConfigs, RequestConfig, ConnConfig) of + ok -> ok; + {error, topic_already_exists} -> ok + end. + action_config(ConnectorName) -> action_config(ConnectorName, _Overrides = #{}). @@ -715,6 +740,21 @@ t_connector_health_check_topic(_Config) -> emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConfig1) ), + %% By providing an inexistent health check topic, we should detect it's + %% disconnected without the need for an action. + ConnectorConfig2 = connector_config(#{ + <<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()), + <<"health_check_topic">> => <<"i-dont-exist-999">> + }), + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := <<"Unknown topic or partition", _/binary>> + }}}, + emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConfig2) + ), + ok end, [] @@ -768,9 +808,13 @@ t_invalid_partition_count_metrics(Config) -> %% Simulate `invalid_partition_count' emqx_common_test_helpers:with_mock( wolff, - send_sync, - fun(_Producers, _Msgs, _Timeout) -> - error({invalid_partition_count, 0, partitioner}) + send_sync2, + fun(_Producers, _Topic, _Msgs, _Timeout) -> + throw(#{ + cause => invalid_partition_count, + count => 0, + partitioner => partitioner + }) end, fun() -> {{ok, _}, {ok, _}} = @@ -813,9 +857,13 @@ t_invalid_partition_count_metrics(Config) -> %% Simulate `invalid_partition_count' emqx_common_test_helpers:with_mock( wolff, - send, - fun(_Producers, _Msgs, _Timeout) -> - error({invalid_partition_count, 0, partitioner}) + send2, + fun(_Producers, _Topic, _Msgs, _AckCallback) -> + throw(#{ + cause => invalid_partition_count, + count => 0, + partitioner => partitioner + }) end, fun() -> {{ok, _}, {ok, _}} = @@ -921,3 +969,126 @@ t_multiple_actions_sharing_topic(Config) -> end ), ok. + +%% Smoke tests for using a templated topic and adynamic kafka topics. +t_dynamic_topics(Config) -> + Type = proplists:get_value(type, Config, ?TYPE), + ConnectorName = proplists:get_value(connector_name, Config, <<"c">>), + ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()), + ActionName = <<"dynamic_topics">>, + ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)), + PreConfiguredTopic1 = <<"pct1">>, + PreConfiguredTopic2 = <<"pct2">>, + ensure_kafka_topic(PreConfiguredTopic1), + ensure_kafka_topic(PreConfiguredTopic2), + ActionConfig = emqx_bridge_v2_testlib:parse_and_check( + action, + Type, + ActionName, + emqx_utils_maps:deep_merge( + ActionConfig1, + #{ + <<"parameters">> => #{ + <<"topic">> => <<"pct${.payload.n}">>, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"value">> => <<"${.payload.p}">> + } + } + } + ) + ), + ?check_trace( + #{timetrap => 7_000}, + begin + ConnectorParams = [ + {connector_config, ConnectorConfig}, + {connector_name, ConnectorName}, + {connector_type, Type} + ], + ActionParams = [ + {action_config, ActionConfig}, + {action_name, ActionName}, + {action_type, Type} + ], + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_connector_api(ConnectorParams), + + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_action_api(ActionParams), + RuleTopic = <<"pct">>, + {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http( + Type, + RuleTopic, + [ + {bridge_name, ActionName} + ] + ), + ?assertStatusAPI(Type, ActionName, <<"connected">>), + + HandlerId = ?FUNCTION_NAME, + TestPid = self(), + telemetry:attach_many( + HandlerId, + emqx_resource_metrics:events(), + fun(EventName, Measurements, Metadata, _Config) -> + Data = #{ + name => EventName, + measurements => Measurements, + metadata => Metadata + }, + TestPid ! {telemetry, Data}, + ok + end, + unused_config + ), + on_exit(fun() -> telemetry:detach(HandlerId) end), + + {ok, C} = emqtt:start_link(#{}), + {ok, _} = emqtt:connect(C), + Payload = fun(Map) -> emqx_utils_json:encode(Map) end, + Offset1 = resolve_kafka_offset(PreConfiguredTopic1), + Offset2 = resolve_kafka_offset(PreConfiguredTopic2), + {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 1, p => <<"p1">>}), [{qos, 1}]), + {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 2, p => <<"p2">>}), [{qos, 1}]), + + check_kafka_message_payload(PreConfiguredTopic1, Offset1, <<"p1">>), + check_kafka_message_payload(PreConfiguredTopic2, Offset2, <<"p2">>), + + ActionId = emqx_bridge_v2:id(Type, ActionName), + ?assertEqual(2, emqx_resource_metrics:matched_get(ActionId)), + ?assertEqual(2, emqx_resource_metrics:success_get(ActionId)), + ?assertEqual(0, emqx_resource_metrics:queuing_get(ActionId)), + + ?assertReceive( + {telemetry, #{ + measurements := #{gauge_set := _}, + metadata := #{worker_id := _, resource_id := ActionId} + }} + ), + + %% If there isn't enough information in the context to resolve to a topic, it + %% should be an unrecoverable error. + ?assertMatch( + {_, {ok, _}}, + ?wait_async_action( + emqtt:publish(C, RuleTopic, Payload(#{not_enough => <<"info">>}), [{qos, 1}]), + #{?snk_kind := "kafka_producer_failed_to_render_topic"} + ) + ), + + %% If it's possible to render the topic, but it isn't in the pre-configured + %% list, it should be an unrecoverable error. + ?assertMatch( + {_, {ok, _}}, + ?wait_async_action( + emqtt:publish(C, RuleTopic, Payload(#{n => 99}), [{qos, 1}]), + #{?snk_kind := "kafka_producer_resolved_to_unknown_topic"} + ) + ), + + ok + end, + [] + ), + ok. diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index fda38ce11d9..4d0b8c1fa46 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -83,7 +83,8 @@ dropped_msg_due_to_mqueue_is_full, socket_receive_paused_by_rate_limit, data_bridge_buffer_overflow, - external_broker_crashed + external_broker_crashed, + unrecoverable_resource_error ]). -define(DEFAULT_RPC_PORT, 5369). diff --git a/apps/emqx_conf/src/emqx_conf_schema_inject.erl b/apps/emqx_conf/src/emqx_conf_schema_inject.erl index 5ffd3a5af91..9564a5915d7 100644 --- a/apps/emqx_conf/src/emqx_conf_schema_inject.erl +++ b/apps/emqx_conf/src/emqx_conf_schema_inject.erl @@ -63,7 +63,7 @@ authn_mods(ee) -> authn_mods(ce) ++ [ emqx_gcp_device_authn_schema, - emqx_authn_scram_http_schema + emqx_authn_scram_restapi_schema ]. authz() -> diff --git a/apps/emqx_gateway/src/emqx_gateway_api_authn.erl b/apps/emqx_gateway/src/emqx_gateway_api_authn.erl index 0707c12aa7e..c79bc8e61da 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_authn.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_authn.erl @@ -383,7 +383,7 @@ schema_authn() -> emqx_dashboard_swagger:schema_with_examples( emqx_authn_schema:authenticator_type_without([ emqx_authn_scram_mnesia_schema, - emqx_authn_scram_http_schema + emqx_authn_scram_restapi_schema ]), emqx_authn_api:authenticator_examples() ). diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 37df54d3f9e..8ae211b5544 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -358,13 +358,13 @@ get_config_bin(NameVsn) -> %% RPC call from Management API or CLI. %% The plugin config Json Map was valid by avro schema %% Or: if no and plugin config ALWAYS be valid before calling this function. -put_config(NameVsn, ConfigJsonMap, AvroValue) when not is_binary(NameVsn) -> +put_config(NameVsn, ConfigJsonMap, AvroValue) when (not is_binary(NameVsn)) -> put_config(bin(NameVsn), ConfigJsonMap, AvroValue); put_config(NameVsn, ConfigJsonMap, _AvroValue) -> HoconBin = hocon_pp:do(ConfigJsonMap, #{}), ok = backup_and_write_hocon_bin(NameVsn, HoconBin), - %% TODO: callback in plugin's on_config_changed (config update by mgmt API) %% TODO: callback in plugin's on_config_upgraded (config vsn upgrade v1 -> v2) + ok = maybe_call_on_config_changed(NameVsn, ConfigJsonMap), ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), ConfigJsonMap), ok. @@ -375,6 +375,43 @@ restart(NameVsn) -> {error, Reason} -> {error, Reason} end. +%% @doc Call plugin's callback on_config_changed/2 +maybe_call_on_config_changed(NameVsn, NewConf) -> + FuncName = on_config_changed, + maybe + {ok, PluginAppModule} ?= app_module_name(NameVsn), + true ?= erlang:function_exported(PluginAppModule, FuncName, 2), + {ok, OldConf} = get_config(NameVsn), + try erlang:apply(PluginAppModule, FuncName, [OldConf, NewConf]) of + _ -> ok + catch + Class:CatchReason:Stacktrace -> + ?SLOG(error, #{ + msg => "failed_to_call_on_config_changed", + exception => Class, + reason => CatchReason, + stacktrace => Stacktrace + }), + ok + end + else + {error, Reason} -> + ?SLOG(info, #{msg => "failed_to_call_on_config_changed", reason => Reason}); + false -> + ?SLOG(info, #{msg => "on_config_changed_callback_not_exported"}); + _ -> + ok + end. + +app_module_name(NameVsn) -> + case read_plugin_info(NameVsn, #{}) of + {ok, #{<<"name">> := Name} = _PluginInfo} -> + emqx_utils:safe_to_existing_atom(<>); + {error, Reason} -> + ?SLOG(error, Reason#{msg => "failed_to_read_plugin_info"}), + {error, Reason} + end. + %% @doc List all installed plugins. %% Including the ones that are installed, but not enabled in config. -spec list() -> [plugin_info()]. diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 516795f39ec..9c1b398ff23 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -298,10 +298,10 @@ running(info, {flush_metrics, _Ref}, _Data) -> running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) -> - ?SLOG(info, #{msg => "async_worker_died", state => running, reason => Reason}), + ?SLOG(info, #{msg => "async_worker_died", state => running, reason => Reason}, #{tag => ?TAG}), handle_async_worker_down(Data0, Pid); running(info, Info, _St) -> - ?SLOG(error, #{msg => "unexpected_msg", state => running, info => Info}), + ?SLOG(error, #{msg => "unexpected_msg", state => running, info => Info}, #{tag => ?TAG}), keep_state_and_data. blocked(enter, _, #{resume_interval := ResumeT} = St0) -> @@ -331,10 +331,10 @@ blocked(info, {flush_metrics, _Ref}, _Data) -> blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) -> - ?SLOG(info, #{msg => "async_worker_died", state => blocked, reason => Reason}), + ?SLOG(info, #{msg => "async_worker_died", state => blocked, reason => Reason}, #{tag => ?TAG}), handle_async_worker_down(Data0, Pid); blocked(info, Info, _Data) -> - ?SLOG(error, #{msg => "unexpected_msg", state => blocked, info => Info}), + ?SLOG(error, #{msg => "unexpected_msg", state => blocked, info => Info}, #{tag => ?TAG}), keep_state_and_data. terminate(_Reason, #{id := Id, index := Index, queue := Q}) -> @@ -981,7 +981,16 @@ handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCTX) -> true -> PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}), + ?SLOG_THROTTLE( + error, + Id, + #{ + resource_id => Id, + msg => unrecoverable_resource_error, + reason => Reason + }, + #{tag => ?TAG} + ), ok end, Counters = @@ -1021,7 +1030,16 @@ handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCT true -> PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}), + ?SLOG_THROTTLE( + error, + Id, + #{ + resource_id => Id, + msg => unrecoverable_resource_error, + reason => Reason + }, + #{tag => ?TAG} + ), ok end, Counters = @@ -1141,12 +1159,16 @@ log_expired_message_count(_Data = #{id := Id, index := Index, counters := Counte false -> ok; true -> - ?SLOG(info, #{ - msg => "buffer_worker_dropped_expired_messages", - resource_id => Id, - worker_index => Index, - expired_count => ExpiredCount - }), + ?SLOG( + info, + #{ + msg => "buffer_worker_dropped_expired_messages", + resource_id => Id, + worker_index => Index, + expired_count => ExpiredCount + }, + #{tag => ?TAG} + ), ok end. @@ -1556,7 +1578,7 @@ handle_async_reply1( case is_expired(ExpireAt, Now) of true -> IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid), - %% evalutate metrics call here since we're not inside + %% evaluate metrics call here since we're not inside %% buffer worker IsAcked andalso begin @@ -1797,12 +1819,16 @@ append_queue(Id, Index, Q, Queries) -> ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), Counters = #{dropped_queue_full => Dropped}, - ?SLOG_THROTTLE(warning, #{ - msg => data_bridge_buffer_overflow, - resource_id => Id, - worker_index => Index, - dropped => Dropped - }), + ?SLOG_THROTTLE( + warning, + #{ + msg => data_bridge_buffer_overflow, + resource_id => Id, + worker_index => Index, + dropped => Dropped + }, + #{tag => ?TAG} + ), {Items2, Q1, Counters} end, ?tp( @@ -2236,11 +2262,15 @@ adjust_batch_time(Id, RequestTTL, BatchTime0) -> BatchTime = max(0, min(BatchTime0, RequestTTL div 2)), case BatchTime =:= BatchTime0 of false -> - ?SLOG(info, #{ - id => Id, - msg => "adjusting_buffer_worker_batch_time", - new_batch_time => BatchTime - }); + ?SLOG( + info, + #{ + resource_id => Id, + msg => "adjusting_buffer_worker_batch_time", + new_batch_time => BatchTime + }, + #{tag => ?TAG} + ); true -> ok end, diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index a3d9d5ebebb..c9be8212719 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -115,11 +115,13 @@ test(#{sql := Sql, context := Context}) -> true -> %% test if the topic matches the topic filters in the rule case emqx_topic:match_any(InTopic, EventTopics) of - true -> test_rule(Sql, Select, Context, EventTopics); - false -> {error, nomatch} + true -> + test_rule(Sql, Select, Context, EventTopics); + false -> + {error, nomatch} end; false -> - case lists:member(InTopic, EventTopics) of + case emqx_topic:match_any(InTopic, EventTopics) of true -> %% the rule is for both publish and events, test it directly test_rule(Sql, Select, Context, EventTopics); diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl index fee06edd177..8a866cc69d4 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl @@ -393,6 +393,38 @@ t_rule_test_smoke(_Config) -> } ], MultipleFrom = [ + #{ + expected => #{code => 200}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"message_publish">>, + <<"qos">> => 1, + <<"topic">> => <<"t/a">>, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => + <<"SELECT\n *\nFROM\n \"t/#\", \"$bridges/mqtt:source\" ">> + } + }, + #{ + expected => #{code => 200}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"message_publish">>, + <<"qos">> => 1, + <<"topic">> => <<"t/a">>, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => + <<"SELECT\n *\nFROM\n \"t/#\", \"$sources/mqtt:source\" ">> + } + }, #{ expected => #{code => 200}, input => @@ -488,6 +520,7 @@ do_t_rule_test_smoke(#{input := Input, expected := #{code := ExpectedCode}} = Ca {true, #{ expected => ExpectedCode, hint => maps:get(hint, Case, <<>>), + input => Input, got => Code, resp_body => Body }} diff --git a/apps/emqx_utils/src/emqx_utils_scram.erl b/apps/emqx_utils/src/emqx_utils_scram.erl index 9d054370362..cb11082fb9b 100644 --- a/apps/emqx_utils/src/emqx_utils_scram.erl +++ b/apps/emqx_utils/src/emqx_utils_scram.erl @@ -16,17 +16,17 @@ -module(emqx_utils_scram). --export([authenticate/6]). +-export([authenticate/7]). %%------------------------------------------------------------------------------ %% Authentication %%------------------------------------------------------------------------------ -authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, Conf) -> +authenticate(AuthMethod, AuthData, AuthCache, Conf, RetrieveFun, OnErrFun, ResultKeys) -> case ensure_auth_method(AuthMethod, AuthData, Conf) of true -> case AuthCache of #{next_step := client_final} -> - check_client_final_message(AuthData, AuthCache, Conf, OnErrFun); + check_client_final_message(AuthData, AuthCache, Conf, OnErrFun, ResultKeys); _ -> check_client_first_message(AuthData, AuthCache, Conf, RetrieveFun, OnErrFun) end; @@ -64,9 +64,7 @@ check_client_first_message( {error, not_authorized} end. -check_client_final_message( - Bin, #{is_superuser := IsSuperuser} = Cache, #{algorithm := Alg}, OnErrFun -) -> +check_client_final_message(Bin, Cache, #{algorithm := Alg}, OnErrFun, ResultKeys) -> case esasl_scram:check_client_final_message( Bin, @@ -74,7 +72,7 @@ check_client_final_message( ) of {ok, ServerFinalMessage} -> - {ok, #{is_superuser => IsSuperuser}, ServerFinalMessage}; + {ok, maps:with(ResultKeys, Cache), ServerFinalMessage}; {error, Reason} -> OnErrFun("check_client_final_message_error", Reason), {error, not_authorized} diff --git a/apps/emqx_utils/src/emqx_variform_bif.erl b/apps/emqx_utils/src/emqx_variform_bif.erl index e66b8e47dd1..09048a69755 100644 --- a/apps/emqx_utils/src/emqx_variform_bif.erl +++ b/apps/emqx_utils/src/emqx_variform_bif.erl @@ -583,7 +583,7 @@ getenv(Bin) when is_binary(Bin) -> EnvKey = ?ENV_CACHE(Bin), case persistent_term:get(EnvKey, undefined) of undefined -> - Name = erlang:binary_to_list(Bin), + Name = "EMQXVAR_" ++ erlang:binary_to_list(Bin), Result = case os:getenv(Name) of false -> diff --git a/apps/emqx_utils/test/emqx_variform_bif_tests.erl b/apps/emqx_utils/test/emqx_variform_bif_tests.erl index 36235be40d2..aa6724de5b8 100644 --- a/apps/emqx_utils/test/emqx_variform_bif_tests.erl +++ b/apps/emqx_utils/test/emqx_variform_bif_tests.erl @@ -77,5 +77,5 @@ system_test() -> EnvName = erlang:atom_to_list(?MODULE), EnvVal = erlang:atom_to_list(?FUNCTION_NAME), EnvNameBin = erlang:list_to_binary(EnvName), - os:putenv(EnvName, EnvVal), + os:putenv("EMQXVAR_" ++ EnvName, EnvVal), ?assertEqual(erlang:list_to_binary(EnvVal), emqx_variform_bif:getenv(EnvNameBin)). diff --git a/changes/ce/feat-13507.en.md b/changes/ce/feat-13507.en.md index 115fa49a975..026cf6bf478 100644 --- a/changes/ce/feat-13507.en.md +++ b/changes/ce/feat-13507.en.md @@ -1,2 +1,4 @@ -Added a new builtin function `getenv` in the rule engine and variform expression to access the environment variables. -Note this value is immutable once loaded from the environment. +Added a new builtin function `getenv` in the rule engine and variform expression to access the environment variables with below limitations. + +- Prefix `EMQXVAR_` is added before reading from OS environment variables. i.e. `getenv('FOO_BAR')` is to read `EMQXVAR_FOO_BAR`. +- The values are immutable once loaded from the OS environment. diff --git a/changes/ce/feat-13528.en.md b/changes/ce/feat-13528.en.md new file mode 100644 index 00000000000..f761e9565f5 --- /dev/null +++ b/changes/ce/feat-13528.en.md @@ -0,0 +1 @@ +Add log throttling for data integration unrecoverable errors. diff --git a/changes/ce/feat-13548.en.md b/changes/ce/feat-13548.en.md new file mode 100644 index 00000000000..75b56cd4323 --- /dev/null +++ b/changes/ce/feat-13548.en.md @@ -0,0 +1,6 @@ +Optionally calls the `on_config_changed/2` callback function when the plugin configuration is updated via the REST API. + +This callback function is assumed to be exported by the `_app` module. +i.e: +Plugin NameVsn: `my_plugin-1.0.0` +This callback function is assumed to be `my_plugin_app:on_config_changed/2` diff --git a/changes/ce/fix-13527.en.md b/changes/ce/fix-13527.en.md new file mode 100644 index 00000000000..0c3324e41da --- /dev/null +++ b/changes/ce/fix-13527.en.md @@ -0,0 +1 @@ +Fixed an issue where running a SQL test in Rule Engine for the Message Publish event when a `$bridges/...` source was included in the `FROM` clause would always yield no results. diff --git a/changes/ee/feat-13452.en.md b/changes/ee/feat-13452.en.md new file mode 100644 index 00000000000..7b2427329be --- /dev/null +++ b/changes/ee/feat-13452.en.md @@ -0,0 +1,5 @@ +Kafka producer action's `topic` config now supports templates. + +The topics must be already created in Kafka. If a message is rendered towards a non-existing topic in Kafka (given Kafka disabled topic auto-creation), the message will fail with an unrecoverable error. Also, if a message does not contain enough information to render to the configured template (e.g.: the template is `t-${t}` and the message context does not define `t`), this message will also fail with an unrecoverable error. + +This same feature is also available for Azure Event Hubs and Confluent Platform producer integrations. diff --git a/changes/ee/feat-13504.en.md b/changes/ee/feat-13504.en.md index c9b22f4032b..acea1241a69 100644 --- a/changes/ee/feat-13504.en.md +++ b/changes/ee/feat-13504.en.md @@ -1 +1,5 @@ Added a HTTP backend for the authentication mechanism `scram`. + +Note: This is not an implementation of the RFC 7804: Salted Challenge Response HTTP Authentication Mechanism. + +This backend is an implementation of scram that uses an external web resource as a source of SCRAM authentication data, including stored key of the client, server key, and the salt. It support other authentication and authorization extension fields like HTTP auth backend, namely: `is_superuser`, `client_attrs`, `expire_at` and `acl`. diff --git a/mix.exs b/mix.exs index a14beba7f26..96bb32632c3 100644 --- a/mix.exs +++ b/mix.exs @@ -387,7 +387,7 @@ defmodule EMQXUmbrella.MixProject do {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"}, + {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"}, diff --git a/rel/i18n/emqx_bridge_azure_event_hub.hocon b/rel/i18n/emqx_bridge_azure_event_hub.hocon index 2a3071f2c71..b99ad56fa1f 100644 --- a/rel/i18n/emqx_bridge_azure_event_hub.hocon +++ b/rel/i18n/emqx_bridge_azure_event_hub.hocon @@ -69,7 +69,7 @@ producer_kafka_opts.label: """Azure Event Hubs Producer""" kafka_topic.desc: -"""Event Hubs name""" +"""Event Hubs name. Supports templates (e.g.: `t-${payload.t}`).""" kafka_topic.label: """Event Hubs Name""" diff --git a/rel/i18n/emqx_bridge_confluent_producer.hocon b/rel/i18n/emqx_bridge_confluent_producer.hocon index 234da3e5f0a..fa933a8ec50 100644 --- a/rel/i18n/emqx_bridge_confluent_producer.hocon +++ b/rel/i18n/emqx_bridge_confluent_producer.hocon @@ -69,10 +69,10 @@ producer_kafka_opts.label: """Confluent Producer""" kafka_topic.desc: -"""Event Hub name""" +"""Kafka topic name. Supports templates (e.g.: `t-${payload.t}`).""" kafka_topic.label: -"""Event Hub Name""" +"""Kafka Topic Name""" kafka_message_timestamp.desc: """Which timestamp to use. The timestamp is expected to be a millisecond precision Unix epoch which can be in string format, e.g. 1661326462115 or '1661326462115'. When the desired data field for this template is not found, or if the found data is not a valid integer, the current system timestamp will be used.""" diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index f63e6f3eb31..b807c80c21f 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -81,7 +81,7 @@ producer_kafka_opts.label: """Kafka Producer""" kafka_topic.desc: -"""Kafka topic name""" +"""Kafka topic name. Supports templates (e.g.: `t-${payload.t}`).""" kafka_topic.label: """Kafka Topic Name"""