From 141d8144e415ae1fe0d49033eb74b1a1a9356bae Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 25 Jul 2024 10:52:26 +0800 Subject: [PATCH 01/19] fix(scram): change the name from `scram_http` to `scram_restapi` --- apps/emqx_auth_http/src/emqx_auth_http_app.erl | 2 +- ...ram_http.erl => emqx_authn_scram_restapi.erl} | 12 +++++++++--- ...a.erl => emqx_authn_scram_restapi_schema.erl} | 16 ++++++++-------- ...TE.erl => emqx_authn_scram_restapi_SUITE.erl} | 16 +++++++++------- ... => emqx_authn_scram_restapi_test_server.erl} | 2 +- apps/emqx_conf/src/emqx_conf_schema_inject.erl | 2 +- apps/emqx_gateway/src/emqx_gateway_api_authn.erl | 2 +- changes/ee/feat-13504.en.md | 4 ++++ 8 files changed, 34 insertions(+), 22 deletions(-) rename apps/emqx_auth_http/src/{emqx_authn_scram_http.erl => emqx_authn_scram_restapi.erl} (93%) rename apps/emqx_auth_http/src/{emqx_authn_scram_http_schema.erl => emqx_authn_scram_restapi_schema.erl} (88%) rename apps/emqx_auth_http/test/{emqx_authn_scram_http_SUITE.erl => emqx_authn_scram_restapi_SUITE.erl} (96%) rename apps/emqx_auth_http/test/{emqx_authn_scram_http_test_server.erl => emqx_authn_scram_restapi_test_server.erl} (98%) diff --git a/apps/emqx_auth_http/src/emqx_auth_http_app.erl b/apps/emqx_auth_http/src/emqx_auth_http_app.erl index 3d8ae0dadd8..8b7d08c4ef2 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http_app.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http_app.erl @@ -25,7 +25,7 @@ start(_StartType, _StartArgs) -> ok = emqx_authz:register_source(?AUTHZ_TYPE, emqx_authz_http), ok = emqx_authn:register_provider(?AUTHN_TYPE, emqx_authn_http), - ok = emqx_authn:register_provider(?AUTHN_TYPE_SCRAM, emqx_authn_scram_http), + ok = emqx_authn:register_provider(?AUTHN_TYPE_SCRAM, emqx_authn_scram_restapi), {ok, Sup} = emqx_auth_http_sup:start_link(), {ok, Sup}. diff --git a/apps/emqx_auth_http/src/emqx_authn_scram_http.erl b/apps/emqx_auth_http/src/emqx_authn_scram_restapi.erl similarity index 93% rename from apps/emqx_auth_http/src/emqx_authn_scram_http.erl rename to apps/emqx_auth_http/src/emqx_authn_scram_restapi.erl index 0e6190b4b9e..f1cca5da26d 100644 --- a/apps/emqx_auth_http/src/emqx_authn_scram_http.erl +++ b/apps/emqx_auth_http/src/emqx_authn_scram_restapi.erl @@ -2,7 +2,13 @@ %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_authn_scram_http). +%% Note: +%% This is not an implementation of the RFC 7804: +%% Salted Challenge Response HTTP Authentication Mechanism. +%% This backend is an implementation of scram, +%% which uses an external web resource as a source of user information. + +-module(emqx_authn_scram_restapi). -include_lib("emqx_auth/include/emqx_authn.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -95,7 +101,7 @@ retrieve( ) -> Request = emqx_authn_http:generate_request(Credential#{username := Username}, State), Response = emqx_resource:simple_sync_query(ResourceId, {Method, Request, RequestTimeout}), - ?TRACE_AUTHN_PROVIDER("scram_http_response", #{ + ?TRACE_AUTHN_PROVIDER("scram_restapi_response", #{ request => emqx_authn_http:request_for_log(Credential, State), response => emqx_authn_http:response_for_log(Response), resource => ResourceId @@ -119,7 +125,7 @@ handle_response(Headers, Body) -> {error, Reason} = Error -> ?TRACE_AUTHN_PROVIDER( error, - "parse_scram_http_response_failed", + "parse_scram_restapi_response_failed", #{content_type => ContentType, body => Body, reason => Reason} ), Error diff --git a/apps/emqx_auth_http/src/emqx_authn_scram_http_schema.erl b/apps/emqx_auth_http/src/emqx_authn_scram_restapi_schema.erl similarity index 88% rename from apps/emqx_auth_http/src/emqx_authn_scram_http_schema.erl rename to apps/emqx_auth_http/src/emqx_authn_scram_restapi_schema.erl index ca43fe3a6ca..bf3398abb5e 100644 --- a/apps/emqx_auth_http/src/emqx_authn_scram_http_schema.erl +++ b/apps/emqx_auth_http/src/emqx_authn_scram_restapi_schema.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_authn_scram_http_schema). +-module(emqx_authn_scram_restapi_schema). -behaviour(emqx_authn_schema). @@ -22,16 +22,16 @@ namespace() -> "authn". refs() -> - [?R_REF(scram_http_get), ?R_REF(scram_http_post)]. + [?R_REF(scram_restapi_get), ?R_REF(scram_restapi_post)]. select_union_member( #{<<"mechanism">> := ?AUTHN_MECHANISM_SCRAM_BIN, <<"backend">> := ?AUTHN_BACKEND_BIN} = Value ) -> case maps:get(<<"method">>, Value, undefined) of <<"get">> -> - [?R_REF(scram_http_get)]; + [?R_REF(scram_restapi_get)]; <<"post">> -> - [?R_REF(scramm_http_post)]; + [?R_REF(scram_restapi_post)]; Else -> throw(#{ reason => "unknown_http_method", @@ -43,20 +43,20 @@ select_union_member( select_union_member(_Value) -> undefined. -fields(scram_http_get) -> +fields(scram_restapi_get) -> [ {method, #{type => get, required => true, desc => ?DESC(emqx_authn_http_schema, method)}}, {headers, fun emqx_authn_http_schema:headers_no_content_type/1} ] ++ common_fields(); -fields(scram_http_post) -> +fields(scram_restapi_post) -> [ {method, #{type => post, required => true, desc => ?DESC(emqx_authn_http_schema, method)}}, {headers, fun emqx_authn_http_schema:headers/1} ] ++ common_fields(). -desc(scram_http_get) -> +desc(scram_restapi_get) -> ?DESC(emqx_authn_http_schema, get); -desc(scram_http_post) -> +desc(scram_restapi_post) -> ?DESC(emqx_authn_http_schema, post); desc(_) -> undefined. diff --git a/apps/emqx_auth_http/test/emqx_authn_scram_http_SUITE.erl b/apps/emqx_auth_http/test/emqx_authn_scram_restapi_SUITE.erl similarity index 96% rename from apps/emqx_auth_http/test/emqx_authn_scram_http_SUITE.erl rename to apps/emqx_auth_http/test/emqx_authn_scram_restapi_SUITE.erl index b00212cb1c2..8cd83f9732a 100644 --- a/apps/emqx_auth_http/test/emqx_authn_scram_http_SUITE.erl +++ b/apps/emqx_auth_http/test/emqx_authn_scram_restapi_SUITE.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_authn_scram_http_SUITE). +-module(emqx_authn_scram_restapi_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -54,11 +54,11 @@ init_per_testcase(_Case, Config) -> [authentication], ?GLOBAL ), - {ok, _} = emqx_authn_scram_http_test_server:start_link(?HTTP_PORT, ?HTTP_PATH), + {ok, _} = emqx_authn_scram_restapi_test_server:start_link(?HTTP_PORT, ?HTTP_PATH), Config. end_per_testcase(_Case, _Config) -> - ok = emqx_authn_scram_http_test_server:stop(). + ok = emqx_authn_scram_restapi_test_server:stop(). %%------------------------------------------------------------------------------ %% Tests @@ -72,7 +72,9 @@ t_create(_Config) -> {create_authenticator, ?GLOBAL, AuthConfig} ), - {ok, [#{provider := emqx_authn_scram_http}]} = emqx_authn_chains:list_authenticators(?GLOBAL). + {ok, [#{provider := emqx_authn_scram_restapi}]} = emqx_authn_chains:list_authenticators( + ?GLOBAL + ). t_create_invalid(_Config) -> AuthConfig = raw_config(), @@ -329,7 +331,7 @@ test_is_superuser(State, ExpectedIsSuperuser) -> ClientFirstMessage = esasl_scram:client_first_message(Username), {continue, ServerFirstMessage, ServerCache} = - emqx_authn_scram_http:authenticate( + emqx_authn_scram_restapi:authenticate( #{ auth_method => <<"SCRAM-SHA-512">>, auth_data => ClientFirstMessage, @@ -349,7 +351,7 @@ test_is_superuser(State, ExpectedIsSuperuser) -> ), {ok, UserInfo1, ServerFinalMessage} = - emqx_authn_scram_http:authenticate( + emqx_authn_scram_restapi:authenticate( #{ auth_method => <<"SCRAM-SHA-512">>, auth_data => ClientFinalMessage, @@ -399,7 +401,7 @@ set_user_handler(Username, Password, IsSuperuser) -> ), {ok, Req, State} end, - ok = emqx_authn_scram_http_test_server:set_handler(Handler). + ok = emqx_authn_scram_restapi_test_server:set_handler(Handler). init_auth() -> init_auth(raw_config()). diff --git a/apps/emqx_auth_http/test/emqx_authn_scram_http_test_server.erl b/apps/emqx_auth_http/test/emqx_authn_scram_restapi_test_server.erl similarity index 98% rename from apps/emqx_auth_http/test/emqx_authn_scram_http_test_server.erl rename to apps/emqx_auth_http/test/emqx_authn_scram_restapi_test_server.erl index 5467df6217d..1e1432e0bb8 100644 --- a/apps/emqx_auth_http/test/emqx_authn_scram_http_test_server.erl +++ b/apps/emqx_auth_http/test/emqx_authn_scram_restapi_test_server.erl @@ -2,7 +2,7 @@ %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_authn_scram_http_test_server). +-module(emqx_authn_scram_restapi_test_server). -behaviour(supervisor). -behaviour(cowboy_handler). diff --git a/apps/emqx_conf/src/emqx_conf_schema_inject.erl b/apps/emqx_conf/src/emqx_conf_schema_inject.erl index 5c155bbf55c..d9465732524 100644 --- a/apps/emqx_conf/src/emqx_conf_schema_inject.erl +++ b/apps/emqx_conf/src/emqx_conf_schema_inject.erl @@ -51,7 +51,7 @@ authn_mods(ee) -> authn_mods(ce) ++ [ emqx_gcp_device_authn_schema, - emqx_authn_scram_http_schema + emqx_authn_scram_restapi_schema ]. authz() -> diff --git a/apps/emqx_gateway/src/emqx_gateway_api_authn.erl b/apps/emqx_gateway/src/emqx_gateway_api_authn.erl index 0707c12aa7e..c79bc8e61da 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_authn.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_authn.erl @@ -383,7 +383,7 @@ schema_authn() -> emqx_dashboard_swagger:schema_with_examples( emqx_authn_schema:authenticator_type_without([ emqx_authn_scram_mnesia_schema, - emqx_authn_scram_http_schema + emqx_authn_scram_restapi_schema ]), emqx_authn_api:authenticator_examples() ). diff --git a/changes/ee/feat-13504.en.md b/changes/ee/feat-13504.en.md index c9b22f4032b..20b3aa1e22d 100644 --- a/changes/ee/feat-13504.en.md +++ b/changes/ee/feat-13504.en.md @@ -1 +1,5 @@ Added a HTTP backend for the authentication mechanism `scram`. + +Note: This is not an implementation of the RFC 7804: Salted Challenge Response HTTP Authentication Mechanism. + +This backend is an implementation of scram that uses an external web resource as a source of user information. From 33eccb35dacde59a549d9310a2ed2e70cfa6b6f3 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 24 Jul 2024 15:49:58 -0300 Subject: [PATCH 02/19] chore: update wolff -> 3.0.2 --- apps/emqx_bridge_azure_event_hub/mix.exs | 2 +- apps/emqx_bridge_azure_event_hub/rebar.config | 2 +- apps/emqx_bridge_confluent/mix.exs | 2 +- apps/emqx_bridge_confluent/rebar.config | 2 +- apps/emqx_bridge_kafka/mix.exs | 2 +- apps/emqx_bridge_kafka/rebar.config | 2 +- .../test/emqx_bridge_kafka_impl_consumer_SUITE.erl | 6 +++--- mix.exs | 2 +- 8 files changed, 10 insertions(+), 10 deletions(-) diff --git a/apps/emqx_bridge_azure_event_hub/mix.exs b/apps/emqx_bridge_azure_event_hub/mix.exs index 42edddbbe87..8f5068d0ea3 100644 --- a/apps/emqx_bridge_azure_event_hub/mix.exs +++ b/apps/emqx_bridge_azure_event_hub/mix.exs @@ -23,7 +23,7 @@ defmodule EMQXBridgeAzureEventHub.MixProject do def deps() do [ - {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"}, + {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"}, diff --git a/apps/emqx_bridge_azure_event_hub/rebar.config b/apps/emqx_bridge_azure_event_hub/rebar.config index 76ea7fa6c8c..c8be2a6a353 100644 --- a/apps/emqx_bridge_azure_event_hub/rebar.config +++ b/apps/emqx_bridge_azure_event_hub/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_confluent/mix.exs b/apps/emqx_bridge_confluent/mix.exs index 46cbe9a023d..134e924fcb8 100644 --- a/apps/emqx_bridge_confluent/mix.exs +++ b/apps/emqx_bridge_confluent/mix.exs @@ -23,7 +23,7 @@ defmodule EMQXBridgeConfluent.MixProject do def deps() do [ - {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"}, + {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"}, diff --git a/apps/emqx_bridge_confluent/rebar.config b/apps/emqx_bridge_confluent/rebar.config index 1a91f501d07..786b1cf82a1 100644 --- a/apps/emqx_bridge_confluent/rebar.config +++ b/apps/emqx_bridge_confluent/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_kafka/mix.exs b/apps/emqx_bridge_kafka/mix.exs index b74b1fdd0dd..a1a59cb08cf 100644 --- a/apps/emqx_bridge_kafka/mix.exs +++ b/apps/emqx_bridge_kafka/mix.exs @@ -23,7 +23,7 @@ defmodule EMQXBridgeKafka.MixProject do def deps() do [ - {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"}, + {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"}, diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index b89c9190f29..77d9b95ef18 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 56aabb1c31f..9119ee6c460 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -477,7 +477,7 @@ do_start_producer(KafkaClientId, KafkaTopic) -> ProducerConfig = #{ name => Name, - partitioner => roundrobin, + partitioner => random, partition_count_refresh_interval_seconds => 1_000, replayq_max_total_bytes => 10_000, replayq_seg_bytes => 9_000, @@ -1520,7 +1520,7 @@ t_receive_after_recovery(Config) -> key => <<"commit", (integer_to_binary(N))/binary>>, value => <<"commit", (integer_to_binary(N))/binary>> } - || N <- lists:seq(1, NPartitions) + || N <- lists:seq(1, NPartitions * 10) ], %% we do distinct passes over this producing part so that %% wolff won't batch everything together. @@ -1933,7 +1933,7 @@ t_node_joins_existing_cluster(Config) -> Val = <<"v", (integer_to_binary(N))/binary>>, publish(Config, KafkaTopic, [#{key => Key, value => Val}]) end, - lists:seq(1, NPartitions) + lists:seq(1, 10 * NPartitions) ), {ok, _} = snabbkaffe:receive_events(SRef1), diff --git a/mix.exs b/mix.exs index 53e5b304fb2..399c996a6d0 100644 --- a/mix.exs +++ b/mix.exs @@ -361,7 +361,7 @@ defmodule EMQXUmbrella.MixProject do {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"}, + {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"}, From df1f4fad7009cc88990491b4b346ac15d07cf7ca Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 10 Jul 2024 18:20:05 -0300 Subject: [PATCH 03/19] feat(kafka producer): allow dynamic topics from pre-configured topics Fixes https://emqx.atlassian.net/browse/EMQX-12656 --- .../emqx_bridge_azure_event_hub_v2_SUITE.erl | 34 +++ .../emqx_bridge_confluent_producer_SUITE.erl | 34 +++ .../src/emqx_bridge_kafka.erl | 45 +++- .../src/emqx_bridge_kafka_impl_producer.erl | 178 +++++++++---- ...emqx_bridge_kafka_producer_action_info.erl | 13 +- .../emqx_bridge_v2_kafka_producer_SUITE.erl | 238 +++++++++++++++++- changes/ee/feat-13452.en.md | 1 + rel/i18n/emqx_bridge_azure_event_hub.hocon | 12 +- rel/i18n/emqx_bridge_confluent_producer.hocon | 14 +- rel/i18n/emqx_bridge_kafka.hocon | 11 +- 10 files changed, 518 insertions(+), 62 deletions(-) create mode 100644 changes/ee/feat-13452.en.md diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl index 661b8819cdf..0136ec5689c 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl @@ -391,3 +391,37 @@ t_multiple_actions_sharing_topic(Config) -> ] ), ok. + +t_pre_configured_topics(Config) -> + ActionConfig0 = ?config(action_config, Config), + ActionConfig = + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ), + ok = emqx_bridge_v2_kafka_producer_SUITE:t_pre_configured_topics( + [ + {type, ?BRIDGE_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), + ok. + +t_templated_topic_and_no_pre_configured_topics(Config) -> + ActionConfig0 = ?config(action_config, Config), + ActionConfig = + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ), + ok = emqx_bridge_v2_kafka_producer_SUITE:t_templated_topic_and_no_pre_configured_topics( + [ + {type, ?BRIDGE_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), + ok. diff --git a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl index 0b3a22a992b..de92b9327c9 100644 --- a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl +++ b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl @@ -400,3 +400,37 @@ t_multiple_actions_sharing_topic(Config) -> ] ), ok. + +t_pre_configured_topics(Config) -> + ActionConfig0 = ?config(action_config, Config), + ActionConfig = + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ), + ok = emqx_bridge_v2_kafka_producer_SUITE:t_pre_configured_topics( + [ + {type, ?ACTION_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), + ok. + +t_templated_topic_and_no_pre_configured_topics(Config) -> + ActionConfig0 = ?config(action_config, Config), + ActionConfig = + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ), + ok = emqx_bridge_v2_kafka_producer_SUITE:t_templated_topic_and_no_pre_configured_topics( + [ + {type, ?ACTION_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), + ok. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 83bc3326660..9a2fa91cf36 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -295,6 +295,7 @@ fields("config_producer") -> fields("config_consumer") -> fields(kafka_consumer); fields(kafka_producer) -> + %% Schema used by bridges V1. connector_config_fields() ++ producer_opts(v1); fields(kafka_producer_action) -> [ @@ -306,6 +307,10 @@ fields(kafka_producer_action) -> {tags, emqx_schema:tags_schema()}, {description, emqx_schema:description_schema()} ] ++ producer_opts(action); +fields(pre_configured_topic) -> + [ + {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})} + ]; fields(kafka_consumer) -> connector_config_fields() ++ fields(consumer_opts); fields(ssl_client_opts) -> @@ -364,9 +369,41 @@ fields(socket_opts) -> validator => fun emqx_schema:validate_tcp_keepalive/1 })} ]; +fields(v1_producer_kafka_opts) -> + OldSchemaFields = + [ + topic, + message, + max_batch_bytes, + compression, + partition_strategy, + required_acks, + kafka_headers, + kafka_ext_headers, + kafka_header_value_encode_mode, + partition_count_refresh_interval, + partitions_limit, + max_inflight, + buffer, + query_mode, + sync_query_timeout + ], + Fields = fields(producer_kafka_opts), + lists:filter( + fun({K, _V}) -> lists:member(K, OldSchemaFields) end, + Fields + ); fields(producer_kafka_opts) -> [ {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}, + {pre_configured_topics, + mk( + hoconsc:array(ref(pre_configured_topic)), + #{ + default => [], + desc => ?DESC("producer_pre_configured_topics") + } + )}, {message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})}, {max_batch_bytes, mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})}, @@ -675,15 +712,15 @@ resource_opts() -> %% However we need to keep it backward compatible for generated schema json (version 0.1.0) %% since schema is data for the 'schemas' API. parameters_field(ActionOrBridgeV1) -> - {Name, Alias} = + {Name, Alias, Ref} = case ActionOrBridgeV1 of v1 -> - {kafka, parameters}; + {kafka, parameters, v1_producer_kafka_opts}; action -> - {parameters, kafka} + {parameters, kafka, producer_kafka_opts} end, {Name, - mk(ref(producer_kafka_opts), #{ + mk(ref(Ref), #{ required => true, aliases => [Alias], desc => ?DESC(producer_kafka_opts), diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 6d88a329e09..80de9840209 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -122,8 +122,8 @@ on_add_channel( {ok, NewState}. create_producers_for_bridge_v2( - InstId, - BridgeV2Id, + ConnResId, + ActionResId, ClientId, #{ bridge_type := BridgeType, @@ -132,33 +132,57 @@ create_producers_for_bridge_v2( ) -> #{ message := MessageTemplate, - topic := KafkaTopic, + pre_configured_topics := PreConfiguredTopics0, + topic := KafkaTopic0, sync_query_timeout := SyncQueryTimeout } = KafkaConfig, + TopicTemplate = {TopicType, KafkaTopic} = maybe_preproc_topic(KafkaTopic0), + PreConfiguredTopics = [T || #{topic := T} <- PreConfiguredTopics0], + KafkaTopics0 = + case TopicType of + fixed -> + [KafkaTopic | PreConfiguredTopics]; + dynamic -> + PreConfiguredTopics + end, + case KafkaTopics0 of + [] -> + throw(<< + "Either the Kafka topic must be fixed (not a template)," + " or at least one pre-defined topic must be set." + >>); + _ -> + ok + end, + KafkaTopics = lists:map(fun bin/1, KafkaTopics0), KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions), - #{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id), - IsDryRun = emqx_resource:is_dry_run(BridgeV2Id), - ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions), + #{name := BridgeName} = emqx_bridge_v2:parse_id(ActionResId), + IsDryRun = emqx_resource:is_dry_run(ActionResId), + [AKafkaTopic | _] = KafkaTopics, + ok = check_topic_and_leader_connections(ActionResId, ClientId, AKafkaTopic, MaxPartitions), WolffProducerConfig = producers_config( - BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id + BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId ), - case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of + case wolff:ensure_supervised_dynamic_producers(ClientId, WolffProducerConfig) of {ok, Producers} -> - ok = emqx_resource:allocate_resource(InstId, {?kafka_producers, BridgeV2Id}, Producers), ok = emqx_resource:allocate_resource( - InstId, {?kafka_telemetry_id, BridgeV2Id}, BridgeV2Id + ConnResId, {?kafka_producers, ActionResId}, Producers ), - _ = maybe_install_wolff_telemetry_handlers(BridgeV2Id), + ok = emqx_resource:allocate_resource( + ConnResId, {?kafka_telemetry_id, ActionResId}, ActionResId + ), + _ = maybe_install_wolff_telemetry_handlers(ActionResId), {ok, #{ message_template => compile_message_template(MessageTemplate), kafka_client_id => ClientId, - kafka_topic => KafkaTopic, + topic_template => TopicTemplate, + pre_configured_topics => KafkaTopics, producers => Producers, - resource_id => BridgeV2Id, - connector_resource_id => InstId, + resource_id => ActionResId, + connector_resource_id => ConnResId, sync_query_timeout => SyncQueryTimeout, kafka_config => KafkaConfig, headers_tokens => KafkaHeadersTokens, @@ -169,7 +193,7 @@ create_producers_for_bridge_v2( {error, Reason2} -> ?SLOG(error, #{ msg => "failed_to_start_kafka_producer", - instance_id => InstId, + instance_id => ConnResId, kafka_client_id => ClientId, kafka_topic => KafkaTopic, reason => Reason2 @@ -264,7 +288,9 @@ remove_producers_for_bridge_v2( ClientId = maps:get(?kafka_client_id, AllocatedResources, no_client_id), maps:foreach( fun - ({?kafka_producers, BridgeV2IdCheck}, Producers) when BridgeV2IdCheck =:= BridgeV2Id -> + ({?kafka_producers, BridgeV2IdCheck}, Producers) when + BridgeV2IdCheck =:= BridgeV2Id + -> deallocate_producers(ClientId, Producers); ({?kafka_telemetry_id, BridgeV2IdCheck}, TelemetryId) when BridgeV2IdCheck =:= BridgeV2Id @@ -297,8 +323,10 @@ on_query( #{installed_bridge_v2s := BridgeV2Configs} = _ConnectorState ) -> #{ - message_template := Template, + message_template := MessageTemplate, + topic_template := TopicTemplate, producers := Producers, + pre_configured_topics := PreConfiguredTopics, sync_query_timeout := SyncTimeout, headers_tokens := KafkaHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens, @@ -310,7 +338,14 @@ on_query( headers_val_encode_mode => KafkaHeadersValEncodeMode }, try - KafkaMessage = render_message(Template, KafkaHeaders, Message), + KafkaTopic = render_topic(TopicTemplate, Message), + case lists:member(KafkaTopic, PreConfiguredTopics) of + false -> + throw({unknown_topic, KafkaTopic}); + true -> + ok + end, + KafkaMessage = render_message(MessageTemplate, KafkaHeaders, Message), ?tp( emqx_bridge_kafka_impl_producer_sync_query, #{headers_config => KafkaHeaders, instance_id => InstId} @@ -318,9 +353,15 @@ on_query( emqx_trace:rendered_action_template(MessageTag, #{ message => KafkaMessage }), - do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) + do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout) catch - error:{invalid_partition_count, Count, _Partitioner} -> + throw:bad_topic -> + ?tp("kafka_producer_failed_to_render_topic", #{}), + {error, {unrecoverable_error, failed_to_render_topic}}; + throw:{unknown_topic, Topic} -> + ?tp("kafka_producer_resolved_to_unknown_topic", #{}), + {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}}; + throw:#{cause := invalid_partition_count, count := Count} -> ?tp("kafka_producer_invalid_partition_count", #{ action_id => MessageTag, query_mode => sync @@ -365,7 +406,9 @@ on_query_async( ) -> #{ message_template := Template, + topic_template := TopicTemplate, producers := Producers, + pre_configured_topics := PreConfiguredTopics, headers_tokens := KafkaHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens, headers_val_encode_mode := KafkaHeadersValEncodeMode @@ -376,6 +419,13 @@ on_query_async( headers_val_encode_mode => KafkaHeadersValEncodeMode }, try + KafkaTopic = render_topic(TopicTemplate, Message), + case lists:member(KafkaTopic, PreConfiguredTopics) of + false -> + throw({unknown_topic, KafkaTopic}); + true -> + ok + end, KafkaMessage = render_message(Template, KafkaHeaders, Message), ?tp( emqx_bridge_kafka_impl_producer_async_query, @@ -384,9 +434,15 @@ on_query_async( emqx_trace:rendered_action_template(MessageTag, #{ message => KafkaMessage }), - do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) + do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn) catch - error:{invalid_partition_count, Count, _Partitioner} -> + throw:bad_topic -> + ?tp("kafka_producer_failed_to_render_topic", #{}), + {error, {unrecoverable_error, failed_to_render_topic}}; + throw:{unknown_topic, Topic} -> + ?tp("kafka_producer_resolved_to_unknown_topic", #{}), + {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}}; + throw:#{cause := invalid_partition_count, count := Count} -> ?tp("kafka_producer_invalid_partition_count", #{ action_id => MessageTag, query_mode => async @@ -424,9 +480,28 @@ compile_message_template(T) -> timestamp => preproc_tmpl(TimestampTemplate) }. +maybe_preproc_topic(Topic) -> + Template = emqx_template:parse(Topic), + case emqx_template:placeholders(Template) of + [] -> + {fixed, bin(Topic)}; + [_ | _] -> + {dynamic, Template} + end. + preproc_tmpl(Tmpl) -> emqx_placeholder:preproc_tmpl(Tmpl). +render_topic({fixed, KafkaTopic}, _Message) -> + KafkaTopic; +render_topic({dynamic, Template}, Message) -> + try + iolist_to_binary(emqx_template:render_strict(Template, Message)) + catch + error:_Errors -> + throw(bad_topic) + end. + render_message( #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, #{ @@ -468,9 +543,11 @@ render_timestamp(Template, Message) -> erlang:system_time(millisecond) end. -do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) -> +do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout) -> try - {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout), + {_Partition, _Offset} = wolff:send_sync2( + Producers, KafkaTopic, [KafkaMessage], SyncTimeout + ), ok catch error:{producer_down, _} = Reason -> @@ -478,7 +555,7 @@ do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) -> error:timeout -> {error, timeout} end; -do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) -> +do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn) -> %% * Must be a batch because wolff:send and wolff:send_sync are batch APIs %% * Must be a single element batch because wolff books calls, but not batch sizes %% for counters and gauges. @@ -486,7 +563,9 @@ do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) -> %% The retuned information is discarded here. %% If the producer process is down when sending, this function would %% raise an error exception which is to be caught by the caller of this callback - {_Partition, Pid} = wolff:send(Producers, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}), + {_Partition, Pid} = wolff:send2( + Producers, KafkaTopic, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]} + ), %% this Pid is so far never used because Kafka producer is by-passing the buffer worker {ok, Pid}. @@ -527,20 +606,24 @@ on_get_status( end. on_get_channel_status( - _ResId, - ChannelId, + _ConnResId, + ActionResId, #{ client_id := ClientId, installed_bridge_v2s := Channels - } = _State + } = _ConnState ) -> %% Note: we must avoid returning `?status_disconnected' here. Returning %% `?status_disconnected' will make resource manager try to restart the producers / %% connector, thus potentially dropping data held in wolff producer's replayq. The %% only exception is if the topic does not exist ("unhealthy target"). - #{kafka_topic := KafkaTopic, partitions_limit := MaxPartitions} = maps:get(ChannelId, Channels), + #{ + pre_configured_topics := PreConfiguredTopics, + partitions_limit := MaxPartitions + } = maps:get(ActionResId, Channels), + [KafkaTopic | _] = PreConfiguredTopics, try - ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions), + ok = check_topic_and_leader_connections(ActionResId, ClientId, KafkaTopic, MaxPartitions), ?status_connected catch throw:{unhealthy_target, Msg} -> @@ -549,11 +632,11 @@ on_get_channel_status( {?status_connecting, {K, E}} end. -check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) -> +check_topic_and_leader_connections(ActionResId, ClientId, KafkaTopic, MaxPartitions) -> case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> ok = check_topic_status(ClientId, Pid, KafkaTopic), - ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions); + ok = check_if_healthy_leaders(ActionResId, ClientId, Pid, KafkaTopic, MaxPartitions); {error, #{reason := no_such_client}} -> throw(#{ reason => cannot_find_kafka_client, @@ -591,8 +674,10 @@ error_summary(Map, [Error]) -> error_summary(Map, [Error | More]) -> Map#{first_error => Error, total_errors => length(More) + 1}. -check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) -> - case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of +check_if_healthy_leaders(ActionResId, ClientId, ClientPid, KafkaTopic, MaxPartitions) when + is_pid(ClientPid) +-> + case wolff_client:get_leader_connections(ClientPid, ActionResId, KafkaTopic, MaxPartitions) of {ok, Leaders} -> %% Kafka is considered healthy as long as any of the partition leader is reachable. case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of @@ -654,7 +739,7 @@ ssl(#{enable := true} = SSL) -> ssl(_) -> false. -producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) -> +producers_config(BridgeType, BridgeName, Input, IsDryRun, ActionResId) -> #{ max_batch_bytes := MaxBatchBytes, compression := Compression, @@ -696,8 +781,8 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) -> max_batch_bytes => MaxBatchBytes, max_send_ahead => MaxInflight - 1, compression => Compression, - alias => BridgeV2Id, - telemetry_meta_data => #{bridge_id => BridgeV2Id}, + group => ActionResId, + telemetry_meta_data => #{bridge_id => ActionResId}, max_partitions => MaxPartitions }. @@ -773,20 +858,19 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) -> %% Note: don't use the instance/manager ID, as that changes everytime %% the bridge is recreated, and will lead to multiplication of %% metrics. --spec telemetry_handler_id(resource_id()) -> binary(). -telemetry_handler_id(ResourceID) -> - <<"emqx-bridge-kafka-producer-", ResourceID/binary>>. +-spec telemetry_handler_id(action_resource_id()) -> binary(). +telemetry_handler_id(ActionResId) -> + <<"emqx-bridge-kafka-producer-", ActionResId/binary>>. -uninstall_telemetry_handlers(ResourceID) -> - HandlerID = telemetry_handler_id(ResourceID), - telemetry:detach(HandlerID). +uninstall_telemetry_handlers(TelemetryId) -> + telemetry:detach(TelemetryId). -maybe_install_wolff_telemetry_handlers(ResourceID) -> +maybe_install_wolff_telemetry_handlers(TelemetryId) -> %% Attach event handlers for Kafka telemetry events. If a handler with the %% handler id already exists, the attach_many function does nothing telemetry:attach_many( %% unique handler id - telemetry_handler_id(ResourceID), + telemetry_handler_id(TelemetryId), [ [wolff, dropped_queue_full], [wolff, queuing], @@ -798,7 +882,7 @@ maybe_install_wolff_telemetry_handlers(ResourceID) -> %% wolff producers; otherwise, multiple kafka producer bridges %% will install multiple handlers to the same wolff events, %% multiplying the metric counts... - #{bridge_id => ResourceID} + #{bridge_id => TelemetryId} ). preproc_kafka_headers(HeadersTmpl) when HeadersTmpl =:= <<>>; HeadersTmpl =:= undefined -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl index d97e68ba616..b9e13e7176a 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl @@ -26,7 +26,12 @@ schema_module() -> emqx_bridge_kafka. connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig), BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1), - emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2). + BridgeV1Config = emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2), + maps:update_with( + <<"kafka">>, + fun(Params) -> maps:with(v1_parameters(), Params) end, + BridgeV1Config + ). bridge_v1_config_to_action_config(BridgeV1Conf0 = #{<<"producer">> := _}, ConnectorName) -> %% Ancient v1 config, when `kafka' key was wrapped by `producer' @@ -51,6 +56,12 @@ bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) -> %% Internal helper functions %%------------------------------------------------------------------------------------------ +v1_parameters() -> + [ + to_bin(K) + || {K, _} <- emqx_bridge_kafka:fields(v1_producer_kafka_opts) + ]. + producer_action_field_keys() -> [ to_bin(K) diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index c26f5e94ee3..6246faaf12b 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("brod/include/brod.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("emqx/include/asserts.hrl"). -import(emqx_common_test_helpers, [on_exit/1]). @@ -165,6 +166,9 @@ send_message(Type, ActionName) -> resolve_kafka_offset() -> KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + resolve_kafka_offset(KafkaTopic). + +resolve_kafka_offset(KafkaTopic) -> Partition = 0, Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset( @@ -174,11 +178,32 @@ resolve_kafka_offset() -> check_kafka_message_payload(Offset, ExpectedPayload) -> KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload). + +check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload) -> Partition = 0, Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset), ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0). +ensure_kafka_topic(KafkaTopic) -> + TopicConfigs = [ + #{ + name => KafkaTopic, + num_partitions => 1, + replication_factor => 1, + assignments => [], + configs => [] + } + ], + RequestConfig = #{timeout => 5_000}, + ConnConfig = #{}, + Endpoints = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), + case brod:create_topics(Endpoints, TopicConfigs, RequestConfig, ConnConfig) of + ok -> ok; + {error, topic_already_exists} -> ok + end. + action_config(ConnectorName) -> action_config(ConnectorName, _Overrides = #{}). @@ -728,9 +753,13 @@ t_invalid_partition_count_metrics(Config) -> %% Simulate `invalid_partition_count' emqx_common_test_helpers:with_mock( wolff, - send_sync, - fun(_Producers, _Msgs, _Timeout) -> - error({invalid_partition_count, 0, partitioner}) + send_sync2, + fun(_Producers, _Topic, _Msgs, _Timeout) -> + throw(#{ + cause => invalid_partition_count, + count => 0, + partitioner => partitioner + }) end, fun() -> {{ok, _}, {ok, _}} = @@ -773,9 +802,13 @@ t_invalid_partition_count_metrics(Config) -> %% Simulate `invalid_partition_count' emqx_common_test_helpers:with_mock( wolff, - send, - fun(_Producers, _Msgs, _Timeout) -> - error({invalid_partition_count, 0, partitioner}) + send2, + fun(_Producers, _Topic, _Msgs, _AckCallback) -> + throw(#{ + cause => invalid_partition_count, + count => 0, + partitioner => partitioner + }) end, fun() -> {{ok, _}, {ok, _}} = @@ -881,3 +914,196 @@ t_multiple_actions_sharing_topic(Config) -> end ), ok. + +%% Smoke tests for using a templated topic and a list of pre-configured kafka topics. +t_pre_configured_topics(Config) -> + Type = proplists:get_value(type, Config, ?TYPE), + ConnectorName = proplists:get_value(connector_name, Config, <<"c">>), + ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()), + ActionName = <<"pre_configured_topics">>, + ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)), + PreConfigureTopic1 = <<"pct1">>, + PreConfigureTopic2 = <<"pct2">>, + ensure_kafka_topic(PreConfigureTopic1), + ensure_kafka_topic(PreConfigureTopic2), + ActionConfig = emqx_bridge_v2_testlib:parse_and_check( + action, + Type, + ActionName, + emqx_utils_maps:deep_merge( + ActionConfig1, + #{ + <<"parameters">> => #{ + <<"topic">> => <<"pct${.payload.n}">>, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"value">> => <<"${.payload.p}">> + }, + <<"pre_configured_topics">> => [ + #{<<"topic">> => PreConfigureTopic1}, + #{<<"topic">> => PreConfigureTopic2} + ] + } + } + ) + ), + ?check_trace( + #{timetrap => 7_000}, + begin + ConnectorParams = [ + {connector_config, ConnectorConfig}, + {connector_name, ConnectorName}, + {connector_type, Type} + ], + ActionParams = [ + {action_config, ActionConfig}, + {action_name, ActionName}, + {action_type, Type} + ], + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_connector_api(ConnectorParams), + + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_action_api(ActionParams), + RuleTopic = <<"pct">>, + {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http( + Type, + RuleTopic, + [ + {bridge_name, ActionName} + ], + #{ + sql => + <<"select *, json_decode(payload) as payload from \"", RuleTopic/binary, + "\" ">> + } + ), + ?assertStatusAPI(Type, ActionName, <<"connected">>), + + HandlerId = ?FUNCTION_NAME, + TestPid = self(), + telemetry:attach_many( + HandlerId, + emqx_resource_metrics:events(), + fun(EventName, Measurements, Metadata, _Config) -> + Data = #{ + name => EventName, + measurements => Measurements, + metadata => Metadata + }, + TestPid ! {telemetry, Data}, + ok + end, + unused_config + ), + on_exit(fun() -> telemetry:detach(HandlerId) end), + + {ok, C} = emqtt:start_link(#{}), + {ok, _} = emqtt:connect(C), + Payload = fun(Map) -> emqx_utils_json:encode(Map) end, + Offset1 = resolve_kafka_offset(PreConfigureTopic1), + Offset2 = resolve_kafka_offset(PreConfigureTopic2), + {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 1, p => <<"p1">>}), [{qos, 1}]), + {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 2, p => <<"p2">>}), [{qos, 1}]), + + check_kafka_message_payload(PreConfigureTopic1, Offset1, <<"p1">>), + check_kafka_message_payload(PreConfigureTopic2, Offset2, <<"p2">>), + + ActionId = emqx_bridge_v2:id(Type, ActionName), + ?assertEqual(2, emqx_resource_metrics:matched_get(ActionId)), + ?assertEqual(2, emqx_resource_metrics:success_get(ActionId)), + ?assertEqual(0, emqx_resource_metrics:queuing_get(ActionId)), + + ?assertReceive( + {telemetry, #{ + measurements := #{gauge_set := _}, + metadata := #{worker_id := _, resource_id := ActionId} + }} + ), + + %% If there isn't enough information in the context to resolve to a topic, it + %% should be an unrecoverable error. + ?assertMatch( + {_, {ok, _}}, + ?wait_async_action( + emqtt:publish(C, RuleTopic, Payload(#{not_enough => <<"info">>}), [{qos, 1}]), + #{?snk_kind := "kafka_producer_failed_to_render_topic"} + ) + ), + + %% If it's possible to render the topic, but it isn't in the pre-configured + %% list, it should be an unrecoverable error. + ?assertMatch( + {_, {ok, _}}, + ?wait_async_action( + emqtt:publish(C, RuleTopic, Payload(#{n => 99}), [{qos, 1}]), + #{?snk_kind := "kafka_producer_resolved_to_unknown_topic"} + ) + ), + + ok + end, + [] + ), + ok. + +%% Checks that creating an action with templated topic and no pre-configured kafka topics +%% throws. +t_templated_topic_and_no_pre_configured_topics(Config) -> + Type = proplists:get_value(type, Config, ?TYPE), + ConnectorName = proplists:get_value(connector_name, Config, <<"c">>), + ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()), + ActionName = <<"bad_pre_configured_topics">>, + ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)), + ActionConfig = emqx_bridge_v2_testlib:parse_and_check( + action, + Type, + ActionName, + emqx_utils_maps:deep_merge( + ActionConfig1, + #{ + <<"parameters">> => #{ + <<"topic">> => <<"pct${.payload.n}">>, + <<"pre_configured_topics">> => [] + } + } + ) + ), + ?check_trace( + #{timetrap => 7_000}, + begin + ConnectorParams = [ + {connector_config, ConnectorConfig}, + {connector_name, ConnectorName}, + {connector_type, Type} + ], + ActionParams = [ + {action_config, ActionConfig}, + {action_name, ActionName}, + {action_type, Type} + ], + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_connector_api(ConnectorParams), + + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_action_api(ActionParams), + + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"status_reason">> := + << + "Either the Kafka topic must be fixed (not a template)," + " or at least one pre-defined topic must be set." + >>, + <<"status">> := <<"disconnected">>, + <<"node_status">> := [#{<<"status">> := <<"disconnected">>}] + }}}, + emqx_bridge_v2_testlib:get_bridge_api(Type, ActionName) + ), + + ok + end, + [] + ), + ok. diff --git a/changes/ee/feat-13452.en.md b/changes/ee/feat-13452.en.md new file mode 100644 index 00000000000..95dae8d32ed --- /dev/null +++ b/changes/ee/feat-13452.en.md @@ -0,0 +1 @@ +Added to possibility to configure a list of predefined Kafka topics to Kafka producer actions, and also to use templates to define the destination Kafka topic. diff --git a/rel/i18n/emqx_bridge_azure_event_hub.hocon b/rel/i18n/emqx_bridge_azure_event_hub.hocon index 3b96e23e684..e683bc9e97d 100644 --- a/rel/i18n/emqx_bridge_azure_event_hub.hocon +++ b/rel/i18n/emqx_bridge_azure_event_hub.hocon @@ -69,7 +69,7 @@ producer_kafka_opts.label: """Azure Event Hubs Producer""" kafka_topic.desc: -"""Event Hubs name""" +"""Event Hubs name. Supports templates (e.g.: `t-${payload.t}`).""" kafka_topic.label: """Event Hubs Name""" @@ -350,4 +350,14 @@ Setting this to a value which is greater than the total number of partitions in partitions_limit.label: """Max Partitions""" +producer_pre_configured_topics.label: +"""Pre-configured Event Hubs""" +producer_pre_configured_topics.desc: +"""A list of pre-configured event hubs to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" + +pre_configured_topic.label: +"""Event Hubs Name""" +pre_configured_topic.desc: +"""Event Hubs name""" + } diff --git a/rel/i18n/emqx_bridge_confluent_producer.hocon b/rel/i18n/emqx_bridge_confluent_producer.hocon index 7483736914a..81c2c0a89be 100644 --- a/rel/i18n/emqx_bridge_confluent_producer.hocon +++ b/rel/i18n/emqx_bridge_confluent_producer.hocon @@ -69,10 +69,10 @@ producer_kafka_opts.label: """Confluent Producer""" kafka_topic.desc: -"""Event Hub name""" +"""Kafka topic name. Supports templates (e.g.: `t-${payload.t}`).""" kafka_topic.label: -"""Event Hub Name""" +"""Kafka Topic Name""" kafka_message_timestamp.desc: """Which timestamp to use. The timestamp is expected to be a millisecond precision Unix epoch which can be in string format, e.g. 1661326462115 or '1661326462115'. When the desired data field for this template is not found, or if the found data is not a valid integer, the current system timestamp will be used.""" @@ -350,4 +350,14 @@ server_name_indication.desc: server_name_indication.label: """SNI""" +producer_pre_configured_topics.label: +"""Pre-configured Topics""" +producer_pre_configured_topics.desc: +"""A list of pre-configured topics to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" + +pre_configured_topic.label: +"""Kafka Topic Name""" +pre_configured_topic.desc: +"""Kafka topic name""" + } diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index 6e0074dddf2..59896cc224a 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -81,7 +81,7 @@ producer_kafka_opts.label: """Kafka Producer""" kafka_topic.desc: -"""Kafka topic name""" +"""Kafka topic name. Supports templates (e.g.: `t-${payload.t}`).""" kafka_topic.label: """Kafka Topic Name""" @@ -446,5 +446,14 @@ server_name_indication.desc: server_name_indication.label: """SNI""" +producer_pre_configured_topics.label: +"""Pre-configured Topics""" +producer_pre_configured_topics.desc: +"""A list of pre-configured topics to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" + +pre_configured_topic.label: +"""Kafka Topic Name""" +pre_configured_topic.desc: +"""Kafka topic name""" } From 7bf70aaab6feea69424c9df8fa40fc4fc276de02 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 25 Jul 2024 17:00:20 +0800 Subject: [PATCH 04/19] feat(scram): supports ACL rules in `scram_restapi` backend --- .../emqx_auth_http/include/emqx_auth_http.hrl | 2 + apps/emqx_auth_http/src/emqx_authn_http.erl | 67 +++--- .../src/emqx_authn_scram_restapi.erl | 54 ++--- .../test/emqx_authn_scram_restapi_SUITE.erl | 191 ++++++++++++------ .../src/emqx_authn_scram_mnesia.erl | 4 +- apps/emqx_utils/src/emqx_utils_scram.erl | 12 +- 6 files changed, 190 insertions(+), 140 deletions(-) diff --git a/apps/emqx_auth_http/include/emqx_auth_http.hrl b/apps/emqx_auth_http/include/emqx_auth_http.hrl index c0bfa217788..439087e9c48 100644 --- a/apps/emqx_auth_http/include/emqx_auth_http.hrl +++ b/apps/emqx_auth_http/include/emqx_auth_http.hrl @@ -31,4 +31,6 @@ -define(AUTHN_TYPE, {?AUTHN_MECHANISM, ?AUTHN_BACKEND}). -define(AUTHN_TYPE_SCRAM, {?AUTHN_MECHANISM_SCRAM, ?AUTHN_BACKEND}). +-define(AUTHN_DATA_FIELDS, [is_superuser, client_attrs, expire_at, acl]). + -endif. diff --git a/apps/emqx_auth_http/src/emqx_authn_http.erl b/apps/emqx_auth_http/src/emqx_authn_http.erl index b294de24f6f..edaa0ee5acc 100644 --- a/apps/emqx_auth_http/src/emqx_authn_http.erl +++ b/apps/emqx_auth_http/src/emqx_authn_http.erl @@ -32,7 +32,9 @@ with_validated_config/2, generate_request/2, request_for_log/2, - response_for_log/1 + response_for_log/1, + extract_auth_data/2, + safely_parse_body/2 ]). %%------------------------------------------------------------------------------ @@ -209,34 +211,14 @@ handle_response(Headers, Body) -> case safely_parse_body(ContentType, Body) of {ok, NBody} -> body_to_auth_data(NBody); - {error, Reason} -> - ?TRACE_AUTHN_PROVIDER( - error, - "parse_http_response_failed", - #{content_type => ContentType, body => Body, reason => Reason} - ), + {error, _Reason} -> ignore end. body_to_auth_data(Body) -> case maps:get(<<"result">>, Body, <<"ignore">>) of <<"allow">> -> - IsSuperuser = emqx_authn_utils:is_superuser(Body), - Attrs = emqx_authn_utils:client_attrs(Body), - try - ExpireAt = expire_at(Body), - ACL = acl(ExpireAt, Body), - Result = merge_maps([ExpireAt, IsSuperuser, ACL, Attrs]), - {ok, Result} - catch - throw:{bad_acl_rule, Reason} -> - %% it's a invalid token, so ok to log - ?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{http_body => Body}), - {error, bad_username_or_password}; - throw:Reason -> - ?TRACE_AUTHN_PROVIDER("bad_response_body", Reason#{http_body => Body}), - {error, bad_username_or_password} - end; + extract_auth_data(http, Body); <<"deny">> -> {error, not_authorized}; <<"ignore">> -> @@ -245,6 +227,24 @@ body_to_auth_data(Body) -> ignore end. +extract_auth_data(Source, Body) -> + IsSuperuser = emqx_authn_utils:is_superuser(Body), + Attrs = emqx_authn_utils:client_attrs(Body), + try + ExpireAt = expire_at(Body), + ACL = acl(ExpireAt, Source, Body), + Result = merge_maps([ExpireAt, IsSuperuser, ACL, Attrs]), + {ok, Result} + catch + throw:{bad_acl_rule, Reason} -> + %% it's a invalid token, so ok to log + ?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{http_body => Body}), + {error, bad_username_or_password}; + throw:Reason -> + ?TRACE_AUTHN_PROVIDER("bad_response_body", Reason#{http_body => Body}), + {error, bad_username_or_password} + end. + merge_maps([]) -> #{}; merge_maps([Map | Maps]) -> maps:merge(Map, merge_maps(Maps)). @@ -283,40 +283,43 @@ expire_sec(#{<<"expire_at">> := _}) -> expire_sec(_) -> undefined. -acl(#{expire_at := ExpireTimeMs}, #{<<"acl">> := Rules}) -> +acl(#{expire_at := ExpireTimeMs}, Source, #{<<"acl">> := Rules}) -> #{ acl => #{ - source_for_logging => http, + source_for_logging => Source, rules => emqx_authz_rule_raw:parse_and_compile_rules(Rules), %% It's seconds level precision (like JWT) for authz %% see emqx_authz_client_info:check/1 expire => erlang:convert_time_unit(ExpireTimeMs, millisecond, second) } }; -acl(_NoExpire, #{<<"acl">> := Rules}) -> +acl(_NoExpire, Source, #{<<"acl">> := Rules}) -> #{ acl => #{ - source_for_logging => http, + source_for_logging => Source, rules => emqx_authz_rule_raw:parse_and_compile_rules(Rules) } }; -acl(_, _) -> +acl(_, _, _) -> #{}. safely_parse_body(ContentType, Body) -> try parse_body(ContentType, Body) catch - _Class:_Reason -> + _Class:Reason -> + ?TRACE_AUTHN_PROVIDER( + error, + "parse_http_response_failed", + #{content_type => ContentType, body => Body, reason => Reason} + ), {error, invalid_body} end. parse_body(<<"application/json", _/binary>>, Body) -> {ok, emqx_utils_json:decode(Body, [return_maps])}; parse_body(<<"application/x-www-form-urlencoded", _/binary>>, Body) -> - Flags = [<<"result">>, <<"is_superuser">>], - RawMap = maps:from_list(cow_qs:parse_qs(Body)), - NBody = maps:with(Flags, RawMap), + NBody = maps:from_list(cow_qs:parse_qs(Body)), {ok, NBody}; parse_body(ContentType, _) -> {error, {unsupported_content_type, ContentType}}. diff --git a/apps/emqx_auth_http/src/emqx_authn_scram_restapi.erl b/apps/emqx_auth_http/src/emqx_authn_scram_restapi.erl index f1cca5da26d..abb91f13052 100644 --- a/apps/emqx_auth_http/src/emqx_authn_scram_restapi.erl +++ b/apps/emqx_auth_http/src/emqx_authn_scram_restapi.erl @@ -10,8 +10,11 @@ -module(emqx_authn_scram_restapi). --include_lib("emqx_auth/include/emqx_authn.hrl"). +-feature(maybe_expr, enable). + +-include("emqx_auth_http.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_auth/include/emqx_authn.hrl"). -behaviour(emqx_authn_provider). @@ -28,10 +31,6 @@ <<"salt">> ]). --define(OPTIONAL_USER_INFO_KEYS, [ - <<"is_superuser">> -]). - %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ @@ -78,7 +77,9 @@ authenticate( reason => Reason }) end, - emqx_utils_scram:authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, State); + emqx_utils_scram:authenticate( + AuthMethod, AuthData, AuthCache, State, RetrieveFun, OnErrFun, ?AUTHN_DATA_FIELDS + ); authenticate(_Credential, _State) -> ignore. @@ -119,16 +120,11 @@ retrieve( handle_response(Headers, Body) -> ContentType = proplists:get_value(<<"content-type">>, Headers), - case safely_parse_body(ContentType, Body) of - {ok, NBody} -> - body_to_user_info(NBody); - {error, Reason} = Error -> - ?TRACE_AUTHN_PROVIDER( - error, - "parse_scram_restapi_response_failed", - #{content_type => ContentType, body => Body, reason => Reason} - ), - Error + maybe + {ok, NBody} ?= emqx_authn_http:safely_parse_body(ContentType, Body), + {ok, UserInfo} ?= body_to_user_info(NBody), + {ok, AuthData} ?= emqx_authn_http:extract_auth_data(scram_restapi, NBody), + {ok, maps:merge(AuthData, UserInfo)} end. body_to_user_info(Body) -> @@ -137,26 +133,16 @@ body_to_user_info(Body) -> true -> case safely_convert_hex(Required0) of {ok, Required} -> - UserInfo0 = maps:merge(Required, maps:with(?OPTIONAL_USER_INFO_KEYS, Body)), - UserInfo1 = emqx_utils_maps:safe_atom_key_map(UserInfo0), - UserInfo = maps:merge(#{is_superuser => false}, UserInfo1), - {ok, UserInfo}; + {ok, emqx_utils_maps:safe_atom_key_map(Required)}; Error -> + ?TRACE_AUTHN_PROVIDER("decode_keys_failed", #{http_body => Body}), Error end; _ -> - ?TRACE_AUTHN_PROVIDER("bad_response_body", #{http_body => Body}), + ?TRACE_AUTHN_PROVIDER("missing_requried_keys", #{http_body => Body}), {error, bad_response} end. -safely_parse_body(ContentType, Body) -> - try - parse_body(ContentType, Body) - catch - _Class:_Reason -> - {error, invalid_body} - end. - safely_convert_hex(Required) -> try {ok, @@ -171,15 +157,5 @@ safely_convert_hex(Required) -> {error, Reason} end. -parse_body(<<"application/json", _/binary>>, Body) -> - {ok, emqx_utils_json:decode(Body, [return_maps])}; -parse_body(<<"application/x-www-form-urlencoded", _/binary>>, Body) -> - Flags = ?REQUIRED_USER_INFO_KEYS ++ ?OPTIONAL_USER_INFO_KEYS, - RawMap = maps:from_list(cow_qs:parse_qs(Body)), - NBody = maps:with(Flags, RawMap), - {ok, NBody}; -parse_body(ContentType, _) -> - {error, {unsupported_content_type, ContentType}}. - merge_scram_conf(Conf, State) -> maps:merge(maps:with([algorithm, iteration_count], Conf), State). diff --git a/apps/emqx_auth_http/test/emqx_authn_scram_restapi_SUITE.erl b/apps/emqx_auth_http/test/emqx_authn_scram_restapi_SUITE.erl index 8cd83f9732a..7963cf1e38b 100644 --- a/apps/emqx_auth_http/test/emqx_authn_scram_restapi_SUITE.erl +++ b/apps/emqx_auth_http/test/emqx_authn_scram_restapi_SUITE.erl @@ -21,6 +21,9 @@ -define(ALGORITHM_STR, <<"sha512">>). -define(ITERATION_COUNT, 4096). +-define(T_ACL_USERNAME, <<"username">>). +-define(T_ACL_PASSWORD, <<"password">>). + -include_lib("emqx/include/emqx_placeholder.hrl"). all() -> @@ -120,59 +123,8 @@ t_authenticate(_Config) -> ok = emqx_config:put([mqtt, idle_timeout], 500), - {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883), - - ClientFirstMessage = esasl_scram:client_first_message(Username), - - ConnectPacket = ?CONNECT_PACKET( - #mqtt_packet_connect{ - proto_ver = ?MQTT_PROTO_V5, - properties = #{ - 'Authentication-Method' => <<"SCRAM-SHA-512">>, - 'Authentication-Data' => ClientFirstMessage - } - } - ), - - ok = emqx_authn_mqtt_test_client:send(Pid, ConnectPacket), - - %% Intentional sleep to trigger idle timeout for the connection not yet authenticated - ok = ct:sleep(1000), - - ?AUTH_PACKET( - ?RC_CONTINUE_AUTHENTICATION, - #{'Authentication-Data' := ServerFirstMessage} - ) = receive_packet(), - - {continue, ClientFinalMessage, ClientCache} = - esasl_scram:check_server_first_message( - ServerFirstMessage, - #{ - client_first_message => ClientFirstMessage, - password => Password, - algorithm => ?ALGORITHM - } - ), - - AuthContinuePacket = ?AUTH_PACKET( - ?RC_CONTINUE_AUTHENTICATION, - #{ - 'Authentication-Method' => <<"SCRAM-SHA-512">>, - 'Authentication-Data' => ClientFinalMessage - } - ), - - ok = emqx_authn_mqtt_test_client:send(Pid, AuthContinuePacket), - - ?CONNACK_PACKET( - ?RC_SUCCESS, - _, - #{'Authentication-Data' := ServerFinalMessage} - ) = receive_packet(), - - ok = esasl_scram:check_server_final_message( - ServerFinalMessage, ClientCache#{algorithm => ?ALGORITHM} - ). + {ok, Pid} = create_connection(Username, Password), + emqx_authn_mqtt_test_client:stop(Pid). t_authenticate_bad_props(_Config) -> Username = <<"u">>, @@ -316,6 +268,47 @@ t_destroy(_Config) -> _ ) = receive_packet(). +t_acl(_Config) -> + init_auth(), + + ACL = emqx_authn_http_SUITE:acl_rules(), + set_user_handler(?T_ACL_USERNAME, ?T_ACL_PASSWORD, #{acl => ACL}), + {ok, Pid} = create_connection(?T_ACL_USERNAME, ?T_ACL_PASSWORD), + + Cases = [ + {allow, <<"http-authn-acl/#">>}, + {deny, <<"http-authn-acl/1">>}, + {deny, <<"t/#">>} + ], + + try + lists:foreach( + fun(Case) -> + test_acl(Case, Pid) + end, + Cases + ) + after + ok = emqx_authn_mqtt_test_client:stop(Pid) + end. + +t_auth_expire(_Config) -> + init_auth(), + + ExpireSec = 3, + WaitTime = timer:seconds(ExpireSec + 1), + ACL = emqx_authn_http_SUITE:acl_rules(), + + set_user_handler(?T_ACL_USERNAME, ?T_ACL_PASSWORD, #{ + acl => ACL, + expire_at => + erlang:system_time(second) + ExpireSec + }), + {ok, Pid} = create_connection(?T_ACL_USERNAME, ?T_ACL_PASSWORD), + + timer:sleep(WaitTime), + ?assertEqual(false, erlang:is_process_alive(Pid)). + t_is_superuser() -> State = init_auth(), ok = test_is_superuser(State, false), @@ -326,7 +319,7 @@ test_is_superuser(State, ExpectedIsSuperuser) -> Username = <<"u">>, Password = <<"p">>, - set_user_handler(Username, Password, ExpectedIsSuperuser), + set_user_handler(Username, Password, #{is_superuser => ExpectedIsSuperuser}), ClientFirstMessage = esasl_scram:client_first_message(Username), @@ -384,19 +377,20 @@ raw_config() -> }. set_user_handler(Username, Password) -> - set_user_handler(Username, Password, false). -set_user_handler(Username, Password, IsSuperuser) -> + set_user_handler(Username, Password, #{is_superuser => false}). +set_user_handler(Username, Password, Extra0) -> %% HTTP Server Handler = fun(Req0, State) -> #{ username := Username } = cowboy_req:match_qs([username], Req0), - UserInfo = make_user_info(Password, ?ALGORITHM, ?ITERATION_COUNT, IsSuperuser), + UserInfo = make_user_info(Password, ?ALGORITHM, ?ITERATION_COUNT), + Extra = maps:merge(#{is_superuser => false}, Extra0), Req = cowboy_req:reply( 200, #{<<"content-type">> => <<"application/json">>}, - emqx_utils_json:encode(UserInfo), + emqx_utils_json:encode(maps:merge(Extra, UserInfo)), Req0 ), {ok, Req, State} @@ -415,7 +409,7 @@ init_auth(Config) -> {ok, [#{state := State}]} = emqx_authn_chains:list_authenticators(?GLOBAL), State. -make_user_info(Password, Algorithm, IterationCount, IsSuperuser) -> +make_user_info(Password, Algorithm, IterationCount) -> {StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info( Password, #{ @@ -426,8 +420,7 @@ make_user_info(Password, Algorithm, IterationCount, IsSuperuser) -> #{ stored_key => binary:encode_hex(StoredKey), server_key => binary:encode_hex(ServerKey), - salt => binary:encode_hex(Salt), - is_superuser => IsSuperuser + salt => binary:encode_hex(Salt) }. receive_packet() -> @@ -438,3 +431,79 @@ receive_packet() -> after 1000 -> ct:fail("Deliver timeout") end. + +create_connection(Username, Password) -> + {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883), + + ClientFirstMessage = esasl_scram:client_first_message(Username), + + ConnectPacket = ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + properties = #{ + 'Authentication-Method' => <<"SCRAM-SHA-512">>, + 'Authentication-Data' => ClientFirstMessage + } + } + ), + + ok = emqx_authn_mqtt_test_client:send(Pid, ConnectPacket), + + %% Intentional sleep to trigger idle timeout for the connection not yet authenticated + ok = ct:sleep(1000), + + ?AUTH_PACKET( + ?RC_CONTINUE_AUTHENTICATION, + #{'Authentication-Data' := ServerFirstMessage} + ) = receive_packet(), + + {continue, ClientFinalMessage, ClientCache} = + esasl_scram:check_server_first_message( + ServerFirstMessage, + #{ + client_first_message => ClientFirstMessage, + password => Password, + algorithm => ?ALGORITHM + } + ), + + AuthContinuePacket = ?AUTH_PACKET( + ?RC_CONTINUE_AUTHENTICATION, + #{ + 'Authentication-Method' => <<"SCRAM-SHA-512">>, + 'Authentication-Data' => ClientFinalMessage + } + ), + + ok = emqx_authn_mqtt_test_client:send(Pid, AuthContinuePacket), + + ?CONNACK_PACKET( + ?RC_SUCCESS, + _, + #{'Authentication-Data' := ServerFinalMessage} + ) = receive_packet(), + + ok = esasl_scram:check_server_final_message( + ServerFinalMessage, ClientCache#{algorithm => ?ALGORITHM} + ), + {ok, Pid}. + +test_acl({allow, Topic}, C) -> + ?assertMatch( + [0], + send_subscribe(C, Topic) + ); +test_acl({deny, Topic}, C) -> + ?assertMatch( + [?RC_NOT_AUTHORIZED], + send_subscribe(C, Topic) + ). + +send_subscribe(Client, Topic) -> + TopicOpts = #{nl => 0, rap => 0, rh => 0, qos => 0}, + Packet = ?SUBSCRIBE_PACKET(1, [{Topic, TopicOpts}]), + emqx_authn_mqtt_test_client:send(Client, Packet), + timer:sleep(200), + + ?SUBACK_PACKET(1, ReasonCode) = receive_packet(), + ReasonCode. diff --git a/apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia.erl b/apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia.erl index d59afea2857..9880b71ee50 100644 --- a/apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia.erl +++ b/apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia.erl @@ -141,7 +141,9 @@ authenticate( reason => Reason }) end, - emqx_utils_scram:authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, State); + emqx_utils_scram:authenticate( + AuthMethod, AuthData, AuthCache, State, RetrieveFun, OnErrFun, [is_superuser] + ); authenticate(_Credential, _State) -> ignore. diff --git a/apps/emqx_utils/src/emqx_utils_scram.erl b/apps/emqx_utils/src/emqx_utils_scram.erl index 9d054370362..cb11082fb9b 100644 --- a/apps/emqx_utils/src/emqx_utils_scram.erl +++ b/apps/emqx_utils/src/emqx_utils_scram.erl @@ -16,17 +16,17 @@ -module(emqx_utils_scram). --export([authenticate/6]). +-export([authenticate/7]). %%------------------------------------------------------------------------------ %% Authentication %%------------------------------------------------------------------------------ -authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, Conf) -> +authenticate(AuthMethod, AuthData, AuthCache, Conf, RetrieveFun, OnErrFun, ResultKeys) -> case ensure_auth_method(AuthMethod, AuthData, Conf) of true -> case AuthCache of #{next_step := client_final} -> - check_client_final_message(AuthData, AuthCache, Conf, OnErrFun); + check_client_final_message(AuthData, AuthCache, Conf, OnErrFun, ResultKeys); _ -> check_client_first_message(AuthData, AuthCache, Conf, RetrieveFun, OnErrFun) end; @@ -64,9 +64,7 @@ check_client_first_message( {error, not_authorized} end. -check_client_final_message( - Bin, #{is_superuser := IsSuperuser} = Cache, #{algorithm := Alg}, OnErrFun -) -> +check_client_final_message(Bin, Cache, #{algorithm := Alg}, OnErrFun, ResultKeys) -> case esasl_scram:check_client_final_message( Bin, @@ -74,7 +72,7 @@ check_client_final_message( ) of {ok, ServerFinalMessage} -> - {ok, #{is_superuser => IsSuperuser}, ServerFinalMessage}; + {ok, maps:with(ResultKeys, Cache), ServerFinalMessage}; {error, Reason} -> OnErrFun("check_client_final_message_error", Reason), {error, not_authorized} From 3fae7049031e4b84e9c8ad8b21f56167c95fb07c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 26 Jul 2024 09:24:31 -0300 Subject: [PATCH 05/19] fix(rule engine tester): fix message publish with bridge source in from clause Fixes https://emqx.atlassian.net/browse/EMQX-12762 --- .../src/emqx_rule_sqltester.erl | 8 +++-- .../test/emqx_rule_engine_api_2_SUITE.erl | 33 +++++++++++++++++++ changes/ce/fix-13527.en.md | 1 + 3 files changed, 39 insertions(+), 3 deletions(-) create mode 100644 changes/ce/fix-13527.en.md diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index a3d9d5ebebb..c9be8212719 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -115,11 +115,13 @@ test(#{sql := Sql, context := Context}) -> true -> %% test if the topic matches the topic filters in the rule case emqx_topic:match_any(InTopic, EventTopics) of - true -> test_rule(Sql, Select, Context, EventTopics); - false -> {error, nomatch} + true -> + test_rule(Sql, Select, Context, EventTopics); + false -> + {error, nomatch} end; false -> - case lists:member(InTopic, EventTopics) of + case emqx_topic:match_any(InTopic, EventTopics) of true -> %% the rule is for both publish and events, test it directly test_rule(Sql, Select, Context, EventTopics); diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl index b2d2fdf8671..b6f3eb30792 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl @@ -332,6 +332,38 @@ t_rule_test_smoke(_Config) -> } ], MultipleFrom = [ + #{ + expected => #{code => 200}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"message_publish">>, + <<"qos">> => 1, + <<"topic">> => <<"t/a">>, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => + <<"SELECT\n *\nFROM\n \"t/#\", \"$bridges/mqtt:source\" ">> + } + }, + #{ + expected => #{code => 200}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"message_publish">>, + <<"qos">> => 1, + <<"topic">> => <<"t/a">>, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => + <<"SELECT\n *\nFROM\n \"t/#\", \"$sources/mqtt:source\" ">> + } + }, #{ expected => #{code => 200}, input => @@ -395,6 +427,7 @@ do_t_rule_test_smoke(#{input := Input, expected := #{code := ExpectedCode}} = Ca {true, #{ expected => ExpectedCode, hint => maps:get(hint, Case, <<>>), + input => Input, got => Code, resp_body => Body }} diff --git a/changes/ce/fix-13527.en.md b/changes/ce/fix-13527.en.md new file mode 100644 index 00000000000..0c3324e41da --- /dev/null +++ b/changes/ce/fix-13527.en.md @@ -0,0 +1 @@ +Fixed an issue where running a SQL test in Rule Engine for the Message Publish event when a `$bridges/...` source was included in the `FROM` clause would always yield no results. From 4e0742c66facdf0f290157b832e5271ee045e42b Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 26 Jul 2024 14:25:20 -0300 Subject: [PATCH 06/19] feat: make kafka producer freely dynamic --- .../emqx_bridge_azure_event_hub_v2_SUITE.erl | 51 ++++------- .../emqx_bridge_confluent_producer_SUITE.erl | 51 ++++------- .../src/emqx_bridge_kafka.erl | 12 --- .../src/emqx_bridge_kafka_impl_producer.erl | 71 ++++++--------- .../emqx_bridge_v2_kafka_producer_SUITE.erl | 89 +++---------------- rel/i18n/emqx_bridge_azure_event_hub.hocon | 10 --- rel/i18n/emqx_bridge_confluent_producer.hocon | 10 --- rel/i18n/emqx_bridge_kafka.hocon | 10 --- 8 files changed, 73 insertions(+), 231 deletions(-) diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl index 0136ec5689c..f2a06cf65e5 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl @@ -382,46 +382,31 @@ t_multiple_actions_sharing_topic(Config) -> ActionConfig0, #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic( - [ - {type, ?BRIDGE_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), - ok. - -t_pre_configured_topics(Config) -> - ActionConfig0 = ?config(action_config, Config), - ActionConfig = - emqx_utils_maps:deep_merge( - ActionConfig0, - #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ok = + emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME( + [ + {type, ?BRIDGE_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_pre_configured_topics( - [ - {type, ?BRIDGE_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), ok. -t_templated_topic_and_no_pre_configured_topics(Config) -> +t_dynamic_topics(Config) -> ActionConfig0 = ?config(action_config, Config), ActionConfig = emqx_utils_maps:deep_merge( ActionConfig0, #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_templated_topic_and_no_pre_configured_topics( - [ - {type, ?BRIDGE_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), + ok = + emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME( + [ + {type, ?BRIDGE_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), ok. diff --git a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl index de92b9327c9..f10e88463f8 100644 --- a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl +++ b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl @@ -391,46 +391,31 @@ t_multiple_actions_sharing_topic(Config) -> ActionConfig0, #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic( - [ - {type, ?ACTION_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), - ok. - -t_pre_configured_topics(Config) -> - ActionConfig0 = ?config(action_config, Config), - ActionConfig = - emqx_utils_maps:deep_merge( - ActionConfig0, - #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ok = + emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME( + [ + {type, ?ACTION_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_pre_configured_topics( - [ - {type, ?ACTION_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), ok. -t_templated_topic_and_no_pre_configured_topics(Config) -> +t_dynamic_topics(Config) -> ActionConfig0 = ?config(action_config, Config), ActionConfig = emqx_utils_maps:deep_merge( ActionConfig0, #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_templated_topic_and_no_pre_configured_topics( - [ - {type, ?ACTION_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), + ok = + emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME( + [ + {type, ?ACTION_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), ok. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 9a2fa91cf36..8f72523b1d9 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -307,10 +307,6 @@ fields(kafka_producer_action) -> {tags, emqx_schema:tags_schema()}, {description, emqx_schema:description_schema()} ] ++ producer_opts(action); -fields(pre_configured_topic) -> - [ - {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})} - ]; fields(kafka_consumer) -> connector_config_fields() ++ fields(consumer_opts); fields(ssl_client_opts) -> @@ -396,14 +392,6 @@ fields(v1_producer_kafka_opts) -> fields(producer_kafka_opts) -> [ {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}, - {pre_configured_topics, - mk( - hoconsc:array(ref(pre_configured_topic)), - #{ - default => [], - desc => ?DESC("producer_pre_configured_topics") - } - )}, {message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})}, {max_batch_bytes, mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})}, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 80de9840209..b358cd42b45 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_bridge_kafka_impl_producer). +-feature(maybe_expr, enable). + -behaviour(emqx_resource). -include_lib("emqx_resource/include/emqx_resource.hrl"). @@ -132,37 +134,22 @@ create_producers_for_bridge_v2( ) -> #{ message := MessageTemplate, - pre_configured_topics := PreConfiguredTopics0, topic := KafkaTopic0, sync_query_timeout := SyncQueryTimeout } = KafkaConfig, - TopicTemplate = {TopicType, KafkaTopic} = maybe_preproc_topic(KafkaTopic0), - PreConfiguredTopics = [T || #{topic := T} <- PreConfiguredTopics0], - KafkaTopics0 = + TopicTemplate = {TopicType, TopicOrTemplate} = maybe_preproc_topic(KafkaTopic0), + MKafkaTopic = case TopicType of - fixed -> - [KafkaTopic | PreConfiguredTopics]; - dynamic -> - PreConfiguredTopics + fixed -> TopicOrTemplate; + dynamic -> dynamic end, - case KafkaTopics0 of - [] -> - throw(<< - "Either the Kafka topic must be fixed (not a template)," - " or at least one pre-defined topic must be set." - >>); - _ -> - ok - end, - KafkaTopics = lists:map(fun bin/1, KafkaTopics0), KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions), #{name := BridgeName} = emqx_bridge_v2:parse_id(ActionResId), IsDryRun = emqx_resource:is_dry_run(ActionResId), - [AKafkaTopic | _] = KafkaTopics, - ok = check_topic_and_leader_connections(ActionResId, ClientId, AKafkaTopic, MaxPartitions), + ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions), WolffProducerConfig = producers_config( BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId ), @@ -179,7 +166,7 @@ create_producers_for_bridge_v2( message_template => compile_message_template(MessageTemplate), kafka_client_id => ClientId, topic_template => TopicTemplate, - pre_configured_topics => KafkaTopics, + topic => MKafkaTopic, producers => Producers, resource_id => ActionResId, connector_resource_id => ConnResId, @@ -195,7 +182,7 @@ create_producers_for_bridge_v2( msg => "failed_to_start_kafka_producer", instance_id => ConnResId, kafka_client_id => ClientId, - kafka_topic => KafkaTopic, + kafka_topic => MKafkaTopic, reason => Reason2 }), throw( @@ -326,7 +313,6 @@ on_query( message_template := MessageTemplate, topic_template := TopicTemplate, producers := Producers, - pre_configured_topics := PreConfiguredTopics, sync_query_timeout := SyncTimeout, headers_tokens := KafkaHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens, @@ -339,12 +325,6 @@ on_query( }, try KafkaTopic = render_topic(TopicTemplate, Message), - case lists:member(KafkaTopic, PreConfiguredTopics) of - false -> - throw({unknown_topic, KafkaTopic}); - true -> - ok - end, KafkaMessage = render_message(MessageTemplate, KafkaHeaders, Message), ?tp( emqx_bridge_kafka_impl_producer_sync_query, @@ -358,7 +338,7 @@ on_query( throw:bad_topic -> ?tp("kafka_producer_failed_to_render_topic", #{}), {error, {unrecoverable_error, failed_to_render_topic}}; - throw:{unknown_topic, Topic} -> + throw:#{cause := unknown_topic_or_partition, topic := Topic} -> ?tp("kafka_producer_resolved_to_unknown_topic", #{}), {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}}; throw:#{cause := invalid_partition_count, count := Count} -> @@ -408,7 +388,6 @@ on_query_async( message_template := Template, topic_template := TopicTemplate, producers := Producers, - pre_configured_topics := PreConfiguredTopics, headers_tokens := KafkaHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens, headers_val_encode_mode := KafkaHeadersValEncodeMode @@ -420,12 +399,6 @@ on_query_async( }, try KafkaTopic = render_topic(TopicTemplate, Message), - case lists:member(KafkaTopic, PreConfiguredTopics) of - false -> - throw({unknown_topic, KafkaTopic}); - true -> - ok - end, KafkaMessage = render_message(Template, KafkaHeaders, Message), ?tp( emqx_bridge_kafka_impl_producer_async_query, @@ -439,7 +412,7 @@ on_query_async( throw:bad_topic -> ?tp("kafka_producer_failed_to_render_topic", #{}), {error, {unrecoverable_error, failed_to_render_topic}}; - throw:{unknown_topic, Topic} -> + throw:#{cause := unknown_topic_or_partition, topic := Topic} -> ?tp("kafka_producer_resolved_to_unknown_topic", #{}), {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}}; throw:#{cause := invalid_partition_count, count := Count} -> @@ -618,12 +591,11 @@ on_get_channel_status( %% connector, thus potentially dropping data held in wolff producer's replayq. The %% only exception is if the topic does not exist ("unhealthy target"). #{ - pre_configured_topics := PreConfiguredTopics, + topic := MKafkaTopic, partitions_limit := MaxPartitions } = maps:get(ActionResId, Channels), - [KafkaTopic | _] = PreConfiguredTopics, try - ok = check_topic_and_leader_connections(ActionResId, ClientId, KafkaTopic, MaxPartitions), + ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions), ?status_connected catch throw:{unhealthy_target, Msg} -> @@ -632,22 +604,29 @@ on_get_channel_status( {?status_connecting, {K, E}} end. -check_topic_and_leader_connections(ActionResId, ClientId, KafkaTopic, MaxPartitions) -> +check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions) -> case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> - ok = check_topic_status(ClientId, Pid, KafkaTopic), - ok = check_if_healthy_leaders(ActionResId, ClientId, Pid, KafkaTopic, MaxPartitions); + maybe + true ?= is_binary(MKafkaTopic), + ok = check_topic_status(ClientId, Pid, MKafkaTopic), + ok = check_if_healthy_leaders( + ActionResId, ClientId, Pid, MKafkaTopic, MaxPartitions + ) + else + false -> ok + end; {error, #{reason := no_such_client}} -> throw(#{ reason => cannot_find_kafka_client, kafka_client => ClientId, - kafka_topic => KafkaTopic + kafka_topic => MKafkaTopic }); {error, #{reason := client_supervisor_not_initialized}} -> throw(#{ reason => restarting, kafka_client => ClientId, - kafka_topic => KafkaTopic + kafka_topic => MKafkaTopic }) end. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 6246faaf12b..08b2723e723 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -915,17 +915,17 @@ t_multiple_actions_sharing_topic(Config) -> ), ok. -%% Smoke tests for using a templated topic and a list of pre-configured kafka topics. -t_pre_configured_topics(Config) -> +%% Smoke tests for using a templated topic and adynamic kafka topics. +t_dynamic_topics(Config) -> Type = proplists:get_value(type, Config, ?TYPE), ConnectorName = proplists:get_value(connector_name, Config, <<"c">>), ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()), - ActionName = <<"pre_configured_topics">>, + ActionName = <<"dynamic_topics">>, ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)), - PreConfigureTopic1 = <<"pct1">>, - PreConfigureTopic2 = <<"pct2">>, - ensure_kafka_topic(PreConfigureTopic1), - ensure_kafka_topic(PreConfigureTopic2), + PreConfiguredTopic1 = <<"pct1">>, + PreConfiguredTopic2 = <<"pct2">>, + ensure_kafka_topic(PreConfiguredTopic1), + ensure_kafka_topic(PreConfiguredTopic2), ActionConfig = emqx_bridge_v2_testlib:parse_and_check( action, Type, @@ -938,11 +938,7 @@ t_pre_configured_topics(Config) -> <<"message">> => #{ <<"key">> => <<"${.clientid}">>, <<"value">> => <<"${.payload.p}">> - }, - <<"pre_configured_topics">> => [ - #{<<"topic">> => PreConfigureTopic1}, - #{<<"topic">> => PreConfigureTopic2} - ] + } } } ) @@ -1001,13 +997,13 @@ t_pre_configured_topics(Config) -> {ok, C} = emqtt:start_link(#{}), {ok, _} = emqtt:connect(C), Payload = fun(Map) -> emqx_utils_json:encode(Map) end, - Offset1 = resolve_kafka_offset(PreConfigureTopic1), - Offset2 = resolve_kafka_offset(PreConfigureTopic2), + Offset1 = resolve_kafka_offset(PreConfiguredTopic1), + Offset2 = resolve_kafka_offset(PreConfiguredTopic2), {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 1, p => <<"p1">>}), [{qos, 1}]), {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 2, p => <<"p2">>}), [{qos, 1}]), - check_kafka_message_payload(PreConfigureTopic1, Offset1, <<"p1">>), - check_kafka_message_payload(PreConfigureTopic2, Offset2, <<"p2">>), + check_kafka_message_payload(PreConfiguredTopic1, Offset1, <<"p1">>), + check_kafka_message_payload(PreConfiguredTopic2, Offset2, <<"p2">>), ActionId = emqx_bridge_v2:id(Type, ActionName), ?assertEqual(2, emqx_resource_metrics:matched_get(ActionId)), @@ -1046,64 +1042,3 @@ t_pre_configured_topics(Config) -> [] ), ok. - -%% Checks that creating an action with templated topic and no pre-configured kafka topics -%% throws. -t_templated_topic_and_no_pre_configured_topics(Config) -> - Type = proplists:get_value(type, Config, ?TYPE), - ConnectorName = proplists:get_value(connector_name, Config, <<"c">>), - ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()), - ActionName = <<"bad_pre_configured_topics">>, - ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)), - ActionConfig = emqx_bridge_v2_testlib:parse_and_check( - action, - Type, - ActionName, - emqx_utils_maps:deep_merge( - ActionConfig1, - #{ - <<"parameters">> => #{ - <<"topic">> => <<"pct${.payload.n}">>, - <<"pre_configured_topics">> => [] - } - } - ) - ), - ?check_trace( - #{timetrap => 7_000}, - begin - ConnectorParams = [ - {connector_config, ConnectorConfig}, - {connector_name, ConnectorName}, - {connector_type, Type} - ], - ActionParams = [ - {action_config, ActionConfig}, - {action_name, ActionName}, - {action_type, Type} - ], - {ok, {{_, 201, _}, _, #{}}} = - emqx_bridge_v2_testlib:create_connector_api(ConnectorParams), - - {ok, {{_, 201, _}, _, #{}}} = - emqx_bridge_v2_testlib:create_action_api(ActionParams), - - ?assertMatch( - {ok, - {{_, 200, _}, _, #{ - <<"status_reason">> := - << - "Either the Kafka topic must be fixed (not a template)," - " or at least one pre-defined topic must be set." - >>, - <<"status">> := <<"disconnected">>, - <<"node_status">> := [#{<<"status">> := <<"disconnected">>}] - }}}, - emqx_bridge_v2_testlib:get_bridge_api(Type, ActionName) - ), - - ok - end, - [] - ), - ok. diff --git a/rel/i18n/emqx_bridge_azure_event_hub.hocon b/rel/i18n/emqx_bridge_azure_event_hub.hocon index e683bc9e97d..7e37d2e4c3a 100644 --- a/rel/i18n/emqx_bridge_azure_event_hub.hocon +++ b/rel/i18n/emqx_bridge_azure_event_hub.hocon @@ -350,14 +350,4 @@ Setting this to a value which is greater than the total number of partitions in partitions_limit.label: """Max Partitions""" -producer_pre_configured_topics.label: -"""Pre-configured Event Hubs""" -producer_pre_configured_topics.desc: -"""A list of pre-configured event hubs to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" - -pre_configured_topic.label: -"""Event Hubs Name""" -pre_configured_topic.desc: -"""Event Hubs name""" - } diff --git a/rel/i18n/emqx_bridge_confluent_producer.hocon b/rel/i18n/emqx_bridge_confluent_producer.hocon index 81c2c0a89be..38623502e81 100644 --- a/rel/i18n/emqx_bridge_confluent_producer.hocon +++ b/rel/i18n/emqx_bridge_confluent_producer.hocon @@ -350,14 +350,4 @@ server_name_indication.desc: server_name_indication.label: """SNI""" -producer_pre_configured_topics.label: -"""Pre-configured Topics""" -producer_pre_configured_topics.desc: -"""A list of pre-configured topics to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" - -pre_configured_topic.label: -"""Kafka Topic Name""" -pre_configured_topic.desc: -"""Kafka topic name""" - } diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index 59896cc224a..a066d30fcf9 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -446,14 +446,4 @@ server_name_indication.desc: server_name_indication.label: """SNI""" -producer_pre_configured_topics.label: -"""Pre-configured Topics""" -producer_pre_configured_topics.desc: -"""A list of pre-configured topics to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" - -pre_configured_topic.label: -"""Kafka Topic Name""" -pre_configured_topic.desc: -"""Kafka topic name""" - } From 1d56ac6e5e87c21a3986f79c061f752d95f0ea48 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 26 Jul 2024 14:26:21 -0300 Subject: [PATCH 07/19] refactor: change topic schema type --- apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 8f72523b1d9..254e8403640 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -391,7 +391,7 @@ fields(v1_producer_kafka_opts) -> ); fields(producer_kafka_opts) -> [ - {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}, + {topic, mk(emqx_schema:template(), #{required => true, desc => ?DESC(kafka_topic)})}, {message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})}, {max_batch_bytes, mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})}, From 0b0a28ae442ab1e46ad00a24825101f755514dc2 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 29 Jul 2024 10:45:24 +0800 Subject: [PATCH 08/19] chore: update changes/ee/feat-13504.en.md Co-authored-by: zmstone --- changes/ee/feat-13504.en.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/ee/feat-13504.en.md b/changes/ee/feat-13504.en.md index 20b3aa1e22d..acea1241a69 100644 --- a/changes/ee/feat-13504.en.md +++ b/changes/ee/feat-13504.en.md @@ -2,4 +2,4 @@ Added a HTTP backend for the authentication mechanism `scram`. Note: This is not an implementation of the RFC 7804: Salted Challenge Response HTTP Authentication Mechanism. -This backend is an implementation of scram that uses an external web resource as a source of user information. +This backend is an implementation of scram that uses an external web resource as a source of SCRAM authentication data, including stored key of the client, server key, and the salt. It support other authentication and authorization extension fields like HTTP auth backend, namely: `is_superuser`, `client_attrs`, `expire_at` and `acl`. From 6786c9b51738f30ffd85ed73145ded9913bf1cd6 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 29 Jul 2024 09:45:52 -0300 Subject: [PATCH 09/19] refactor: improve descriptions and identifiers Co-authored-by: zmstone --- .../src/emqx_bridge_kafka_impl_producer.erl | 2 +- changes/ee/feat-13452.en.md | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index b358cd42b45..fb7fce63cb0 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -839,7 +839,7 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) -> %% metrics. -spec telemetry_handler_id(action_resource_id()) -> binary(). telemetry_handler_id(ActionResId) -> - <<"emqx-bridge-kafka-producer-", ActionResId/binary>>. + ActionResId. uninstall_telemetry_handlers(TelemetryId) -> telemetry:detach(TelemetryId). diff --git a/changes/ee/feat-13452.en.md b/changes/ee/feat-13452.en.md index 95dae8d32ed..7b2427329be 100644 --- a/changes/ee/feat-13452.en.md +++ b/changes/ee/feat-13452.en.md @@ -1 +1,5 @@ -Added to possibility to configure a list of predefined Kafka topics to Kafka producer actions, and also to use templates to define the destination Kafka topic. +Kafka producer action's `topic` config now supports templates. + +The topics must be already created in Kafka. If a message is rendered towards a non-existing topic in Kafka (given Kafka disabled topic auto-creation), the message will fail with an unrecoverable error. Also, if a message does not contain enough information to render to the configured template (e.g.: the template is `t-${t}` and the message context does not define `t`), this message will also fail with an unrecoverable error. + +This same feature is also available for Azure Event Hubs and Confluent Platform producer integrations. From 693d5dd39425ad409f49d42d7c43406e1d3bf1c9 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 29 Jul 2024 11:37:33 -0300 Subject: [PATCH 10/19] feat: attempt to automatically decode `payload` similar to key and message templates --- .../src/emqx_bridge_kafka_impl_producer.erl | 2 +- .../test/emqx_bridge_v2_kafka_producer_SUITE.erl | 7 +------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index fb7fce63cb0..a8e1b56dd60 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -469,7 +469,7 @@ render_topic({fixed, KafkaTopic}, _Message) -> KafkaTopic; render_topic({dynamic, Template}, Message) -> try - iolist_to_binary(emqx_template:render_strict(Template, Message)) + iolist_to_binary(emqx_template:render_strict(Template, {emqx_jsonish, Message})) catch error:_Errors -> throw(bad_topic) diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 08b2723e723..baa5368cf9d 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -967,12 +967,7 @@ t_dynamic_topics(Config) -> RuleTopic, [ {bridge_name, ActionName} - ], - #{ - sql => - <<"select *, json_decode(payload) as payload from \"", RuleTopic/binary, - "\" ">> - } + ] ), ?assertStatusAPI(Type, ActionName, <<"connected">>), From 8dc1d1424a704898a205f4056129f435c545d884 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 17 Jul 2024 16:05:16 +0800 Subject: [PATCH 11/19] chore: add resource tag for log --- apps/emqx_resource/include/emqx_resource.hrl | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 8c2bb39a15b..6603f770851 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -167,4 +167,12 @@ ). -define(TAG, "RESOURCE"). +-define(LOG_LEVEL(_L_), + case _L_ of + true -> info; + false -> warning + end +). +-define(TAG, "RESOURCE"). + -define(RESOURCE_ALLOCATION_TAB, emqx_resource_allocations). From 2924ec582a111a0cd1018c18b54c4f47c0d2e676 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 17 Jul 2024 16:29:18 +0800 Subject: [PATCH 12/19] feat: add unrecoverable_resource_error throttle --- apps/emqx_conf/src/emqx_conf_schema.erl | 3 +- .../src/emqx_resource_buffer_worker.erl | 68 ++++++++++++------- 2 files changed, 46 insertions(+), 25 deletions(-) diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 4c72b74b1fe..505aff3e317 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -80,7 +80,8 @@ cannot_publish_to_topic_due_to_not_authorized, cannot_publish_to_topic_due_to_quota_exceeded, connection_rejected_due_to_license_limit_reached, - dropped_msg_due_to_mqueue_is_full + dropped_msg_due_to_mqueue_is_full, + unrecoverable_resource_error ]). %% Callback to upgrade config after loaded from config file but before validation. diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 05d42ed1a37..7419820f7ca 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -298,10 +298,10 @@ running(info, {flush_metrics, _Ref}, _Data) -> running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) -> - ?SLOG(info, #{msg => "async_worker_died", state => running, reason => Reason}), + ?SLOG(info, #{msg => "async_worker_died", state => running, reason => Reason}, #{tag => ?TAG}), handle_async_worker_down(Data0, Pid); running(info, Info, _St) -> - ?SLOG(error, #{msg => "unexpected_msg", state => running, info => Info}), + ?SLOG(error, #{msg => "unexpected_msg", state => running, info => Info}, #{tag => ?TAG}), keep_state_and_data. blocked(enter, _, #{resume_interval := ResumeT} = St0) -> @@ -331,10 +331,10 @@ blocked(info, {flush_metrics, _Ref}, _Data) -> blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) -> - ?SLOG(info, #{msg => "async_worker_died", state => blocked, reason => Reason}), + ?SLOG(info, #{msg => "async_worker_died", state => blocked, reason => Reason}, #{tag => ?TAG}), handle_async_worker_down(Data0, Pid); blocked(info, Info, _Data) -> - ?SLOG(error, #{msg => "unexpected_msg", state => blocked, info => Info}), + ?SLOG(error, #{msg => "unexpected_msg", state => blocked, info => Info}, #{tag => ?TAG}), keep_state_and_data. terminate(_Reason, #{id := Id, index := Index, queue := Q}) -> @@ -981,7 +981,11 @@ handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCTX) -> true -> PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}), + ?SLOG_THROTTLE(error, #{ + resource_id => Id, + msg => unrecoverable_resource_error, + reason => Reason + }), ok end, Counters = @@ -1021,7 +1025,11 @@ handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCT true -> PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}), + ?SLOG_THROTTLE(error, #{ + resource_id => Id, + msg => unrecoverable_resource_error, + reason => Reason + }), ok end, Counters = @@ -1141,12 +1149,16 @@ log_expired_message_count(_Data = #{id := Id, index := Index, counters := Counte false -> ok; true -> - ?SLOG(info, #{ - msg => "buffer_worker_dropped_expired_messages", - resource_id => Id, - worker_index => Index, - expired_count => ExpiredCount - }), + ?SLOG( + info, + #{ + msg => "buffer_worker_dropped_expired_messages", + resource_id => Id, + worker_index => Index, + expired_count => ExpiredCount + }, + #{tag => ?TAG} + ), ok end. @@ -1556,7 +1568,7 @@ handle_async_reply1( case is_expired(ExpireAt, Now) of true -> IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid), - %% evalutate metrics call here since we're not inside + %% evaluate metrics call here since we're not inside %% buffer worker IsAcked andalso begin @@ -1797,12 +1809,16 @@ append_queue(Id, Index, Q, Queries) -> ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), Counters = #{dropped_queue_full => Dropped}, - ?SLOG(info, #{ - msg => "buffer_worker_overflow", - resource_id => Id, - worker_index => Index, - dropped => Dropped - }), + ?SLOG( + info, + #{ + msg => "buffer_worker_overflow", + resource_id => Id, + worker_index => Index, + dropped => Dropped + }, + #{tag => ?TAG} + ), {Items2, Q1, Counters} end, ?tp( @@ -2236,11 +2252,15 @@ adjust_batch_time(Id, RequestTTL, BatchTime0) -> BatchTime = max(0, min(BatchTime0, RequestTTL div 2)), case BatchTime =:= BatchTime0 of false -> - ?SLOG(info, #{ - id => Id, - msg => "adjusting_buffer_worker_batch_time", - new_batch_time => BatchTime - }); + ?SLOG( + info, + #{ + resource_id => Id, + msg => "adjusting_buffer_worker_batch_time", + new_batch_time => BatchTime + }, + #{tag => ?TAG} + ); true -> ok end, From f6f1d32da0ac8d7a21b95ae3d8eb616871bebb14 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 25 Jul 2024 13:06:35 +0800 Subject: [PATCH 13/19] feat: throttle with resource_id --- apps/emqx/include/logger.hrl | 8 +- apps/emqx/src/emqx_log_throttler.erl | 80 +++++++++++++++---- apps/emqx/test/emqx_log_throttler_SUITE.erl | 69 +++++++++++++--- apps/emqx_resource/include/emqx_resource.hrl | 8 -- .../src/emqx_resource_buffer_worker.erl | 30 ++++--- 5 files changed, 149 insertions(+), 46 deletions(-) diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index 31fe0e36a96..a7455418d9d 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -38,16 +38,20 @@ ). %% NOTE: do not forget to use atom for msg and add every used msg to -%% the default value of `log.thorttling.msgs` list. +%% the default value of `log.throttling.msgs` list. -define(SLOG_THROTTLE(Level, Data), ?SLOG_THROTTLE(Level, Data, #{}) ). -define(SLOG_THROTTLE(Level, Data, Meta), + ?SLOG_THROTTLE(Level, undefined, Data, Meta) +). + +-define(SLOG_THROTTLE(Level, UniqueKey, Data, Meta), case logger:allow(Level, ?MODULE) of true -> (fun(#{msg := __Msg} = __Data) -> - case emqx_log_throttler:allow(__Msg) of + case emqx_log_throttler:allow(__Msg, UniqueKey) of true -> logger:log(Level, __Data, Meta); false -> diff --git a/apps/emqx/src/emqx_log_throttler.erl b/apps/emqx/src/emqx_log_throttler.erl index 3ebc268fa63..008bd16633e 100644 --- a/apps/emqx/src/emqx_log_throttler.erl +++ b/apps/emqx/src/emqx_log_throttler.erl @@ -25,7 +25,7 @@ -export([start_link/0]). %% throttler API --export([allow/1]). +-export([allow/2]). %% gen_server callbacks -export([ @@ -40,23 +40,22 @@ -define(SEQ_ID(Msg), {?MODULE, Msg}). -define(NEW_SEQ, atomics:new(1, [{signed, false}])). -define(GET_SEQ(Msg), persistent_term:get(?SEQ_ID(Msg), undefined)). +-define(ERASE_SEQ(Msg), persistent_term:erase(?SEQ_ID(Msg))). -define(RESET_SEQ(SeqRef), atomics:put(SeqRef, 1, 0)). -define(INC_SEQ(SeqRef), atomics:add(SeqRef, 1, 1)). -define(GET_DROPPED(SeqRef), atomics:get(SeqRef, 1) - 1). -define(IS_ALLOWED(SeqRef), atomics:add_get(SeqRef, 1, 1) =:= 1). --define(NEW_THROTTLE(Msg, SeqRef), persistent_term:put(?SEQ_ID(Msg), SeqRef)). - -define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])). -define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))). --spec allow(atom()) -> boolean(). -allow(Msg) when is_atom(Msg) -> +-spec allow(atom(), any()) -> boolean(). +allow(Msg, UniqueKey) when is_atom(Msg) -> case emqx_logger:get_primary_log_level() of debug -> true; _ -> - do_allow(Msg) + do_allow(Msg, UniqueKey) end. -spec start_link() -> startlink_ret(). @@ -68,7 +67,8 @@ start_link() -> %%-------------------------------------------------------------------- init([]) -> - ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST), + process_flag(trap_exit, true), + ok = lists:foreach(fun(Msg) -> new_throttler(Msg) end, ?MSGS_LIST), CurrentPeriodMs = ?TIME_WINDOW_MS, TimerRef = schedule_refresh(CurrentPeriodMs), {ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}. @@ -88,14 +88,19 @@ handle_info(refresh, #{current_period_ms := PeriodMs} = State) -> case ?GET_SEQ(Msg) of %% Should not happen, unless the static ids list is updated at run-time. undefined -> - ?NEW_THROTTLE(Msg, ?NEW_SEQ), + new_throttler(Msg), ?tp(log_throttler_new_msg, #{throttled_msg => Msg}), Acc; + SeqMap when is_map(SeqMap) -> + maps:fold( + fun(Key, Ref, Acc0) -> + drop_stats(Ref, emqx_utils:format("~ts:~s", [Msg, Key]), Acc0) + end, + Acc, + SeqMap + ); SeqRef -> - Dropped = ?GET_DROPPED(SeqRef), - ok = ?RESET_SEQ(SeqRef), - ?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}), - maybe_add_dropped(Msg, Dropped, Acc) + drop_stats(SeqRef, Msg, Acc) end end, #{}, @@ -112,7 +117,34 @@ handle_info(Info, State) -> ?SLOG(error, #{msg => "unxpected_info", info => Info}), {noreply, State}. +drop_stats(SeqRef, Msg, Acc) -> + Dropped = ?GET_DROPPED(SeqRef), + ok = ?RESET_SEQ(SeqRef), + ?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}), + maybe_add_dropped(Msg, Dropped, Acc). + terminate(_Reason, _State) -> + lists:foreach( + fun(Msg) -> + case ?GET_SEQ(Msg) of + undefined -> + ok; + SeqMap when is_map(SeqMap) -> + maps:foreach( + fun(_, Ref) -> + ok = ?RESET_SEQ(Ref) + end, + SeqMap + ); + SeqRef -> + %% atomics don't have erase API... + %% (if nobody hold the ref, the atomics should erase automatically?) + ok = ?RESET_SEQ(SeqRef) + end, + ?ERASE_SEQ(Msg) + end, + ?MSGS_LIST + ), ok. code_change(_OldVsn, State, _Extra) -> @@ -122,17 +154,27 @@ code_change(_OldVsn, State, _Extra) -> %% internal functions %%-------------------------------------------------------------------- -do_allow(Msg) -> +do_allow(Msg, UniqueKey) -> case persistent_term:get(?SEQ_ID(Msg), undefined) of undefined -> %% This is either a race condition (emqx_log_throttler is not started yet) %% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is %% not added to the default value of `log.throttling.msgs`. - ?SLOG(info, #{ - msg => "missing_log_throttle_sequence", + ?SLOG(debug, #{ + msg => "log_throttle_disabled", throttled_msg => Msg }), true; + %% e.g: unrecoverable msg throttle according resource_id + SeqMap when is_map(SeqMap) -> + case maps:find(UniqueKey, SeqMap) of + {ok, SeqRef} -> + ?IS_ALLOWED(SeqRef); + error -> + SeqRef = ?NEW_SEQ, + new_throttler(Msg, SeqMap#{UniqueKey => SeqRef}), + true + end; SeqRef -> ?IS_ALLOWED(SeqRef) end. @@ -154,3 +196,11 @@ maybe_log_dropped(_DroppedStats, _PeriodMs) -> schedule_refresh(PeriodMs) -> ?tp(log_throttler_sched_refresh, #{new_period_ms => PeriodMs}), erlang:send_after(PeriodMs, ?MODULE, refresh). + +new_throttler(unrecoverable_resource_error = Msg) -> + persistent_term:put(?SEQ_ID(Msg), #{}); +new_throttler(Msg) -> + persistent_term:put(?SEQ_ID(Msg), ?NEW_SEQ). + +new_throttler(Msg, Map) -> + persistent_term:put(?SEQ_ID(Msg), Map). diff --git a/apps/emqx/test/emqx_log_throttler_SUITE.erl b/apps/emqx/test/emqx_log_throttler_SUITE.erl index 8b3ac020755..23150a3b1b5 100644 --- a/apps/emqx/test/emqx_log_throttler_SUITE.erl +++ b/apps/emqx/test/emqx_log_throttler_SUITE.erl @@ -26,6 +26,7 @@ %% Have to use real msgs, as the schema is guarded by enum. -define(THROTTLE_MSG, authorization_permission_denied). -define(THROTTLE_MSG1, cannot_publish_to_topic_due_to_not_authorized). +-define(THROTTLE_UNRECOVERABLE_MSG, unrecoverable_resource_error). -define(TIME_WINDOW, <<"1s">>). all() -> emqx_common_test_helpers:all(?MODULE). @@ -59,6 +60,11 @@ end_per_suite(Config) -> emqx_cth_suite:stop(?config(suite_apps, Config)), emqx_config:delete_override_conf_files(). +init_per_testcase(t_throttle_recoverable_msg, Config) -> + ok = snabbkaffe:start_trace(), + [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]), + {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_UNRECOVERABLE_MSG | Conf], #{}), + Config; init_per_testcase(t_throttle_add_new_msg, Config) -> ok = snabbkaffe:start_trace(), [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]), @@ -72,6 +78,10 @@ init_per_testcase(_TC, Config) -> ok = snabbkaffe:start_trace(), Config. +end_per_testcase(t_throttle_recoverable_msg, _Config) -> + ok = snabbkaffe:stop(), + {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}), + ok; end_per_testcase(t_throttle_add_new_msg, _Config) -> ok = snabbkaffe:stop(), {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}), @@ -101,8 +111,8 @@ t_throttle(_Config) -> 5000 ), - ?assert(emqx_log_throttler:allow(?THROTTLE_MSG)), - ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)), + ?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)), {ok, _} = ?block_until( #{ ?snk_kind := log_throttler_dropped, @@ -115,14 +125,48 @@ t_throttle(_Config) -> [] ). +t_throttle_recoverable_msg(_Config) -> + ResourceId = <<"resource_id">>, + ThrottledMsg = emqx_utils:format("~ts:~s", [?THROTTLE_UNRECOVERABLE_MSG, ResourceId]), + ?check_trace( + begin + %% Warm-up and block to increase the probability that next events + %% will be in the same throttling time window. + {ok, _} = ?block_until( + #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_UNRECOVERABLE_MSG}, + 5000 + ), + {_, {ok, _}} = ?wait_async_action( + events(?THROTTLE_UNRECOVERABLE_MSG, ResourceId), + #{ + ?snk_kind := log_throttler_dropped, + throttled_msg := ThrottledMsg + }, + 5000 + ), + + ?assert(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)), + {ok, _} = ?block_until( + #{ + ?snk_kind := log_throttler_dropped, + throttled_msg := ThrottledMsg, + dropped_count := 1 + }, + 3000 + ) + end, + [] + ). + t_throttle_add_new_msg(_Config) -> ?check_trace( begin {ok, _} = ?block_until( #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000 ), - ?assert(emqx_log_throttler:allow(?THROTTLE_MSG1)), - ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1)), + ?assert(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)), {ok, _} = ?block_until( #{ ?snk_kind := log_throttler_dropped, @@ -137,8 +181,8 @@ t_throttle_add_new_msg(_Config) -> t_throttle_no_msg(_Config) -> %% Must simply pass with no crashes - ?assert(emqx_log_throttler:allow(no_test_throttle_msg)), - ?assert(emqx_log_throttler:allow(no_test_throttle_msg)), + ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)), + ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)), timer:sleep(10), ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))). @@ -168,8 +212,8 @@ t_throttle_debug_primary_level(_Config) -> #{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG}, 5000 ), - ?assert(emqx_log_throttler:allow(?THROTTLE_MSG)), - ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)), + ?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)), {ok, _} = ?block_until( #{ ?snk_kind := log_throttler_dropped, @@ -187,10 +231,13 @@ t_throttle_debug_primary_level(_Config) -> %%-------------------------------------------------------------------- events(Msg) -> - events(100, Msg). + events(100, Msg, undefined). + +events(Msg, Id) -> + events(100, Msg, Id). -events(N, Msg) -> - [emqx_log_throttler:allow(Msg) || _ <- lists:seq(1, N)]. +events(N, Msg, Id) -> + [emqx_log_throttler:allow(Msg, Id) || _ <- lists:seq(1, N)]. module_exists(Mod) -> case erlang:module_loaded(Mod) of diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 6603f770851..8c2bb39a15b 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -167,12 +167,4 @@ ). -define(TAG, "RESOURCE"). --define(LOG_LEVEL(_L_), - case _L_ of - true -> info; - false -> warning - end -). --define(TAG, "RESOURCE"). - -define(RESOURCE_ALLOCATION_TAB, emqx_resource_allocations). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 7419820f7ca..a203247e9cf 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -981,11 +981,16 @@ handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCTX) -> true -> PostFn = fun() -> - ?SLOG_THROTTLE(error, #{ - resource_id => Id, - msg => unrecoverable_resource_error, - reason => Reason - }), + ?SLOG_THROTTLE( + error, + Id, + #{ + resource_id => Id, + msg => unrecoverable_resource_error, + reason => Reason + }, + #{tag => ?TAG} + ), ok end, Counters = @@ -1025,11 +1030,16 @@ handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCT true -> PostFn = fun() -> - ?SLOG_THROTTLE(error, #{ - resource_id => Id, - msg => unrecoverable_resource_error, - reason => Reason - }), + ?SLOG_THROTTLE( + error, + Id, + #{ + resource_id => Id, + msg => unrecoverable_resource_error, + reason => Reason + }, + #{tag => ?TAG} + ), ok end, Counters = From e08425e67d82e357adb0250df96e0999baae0f4e Mon Sep 17 00:00:00 2001 From: zmstone Date: Fri, 26 Jul 2024 15:16:07 +0200 Subject: [PATCH 14/19] refactor(log-throttler): remove unnecessary code there is no need to reset counters before erasing --- apps/emqx/src/emqx_log_throttler.erl | 50 +++++++++------------ apps/emqx/test/emqx_log_throttler_SUITE.erl | 11 +++-- 2 files changed, 28 insertions(+), 33 deletions(-) diff --git a/apps/emqx/src/emqx_log_throttler.erl b/apps/emqx/src/emqx_log_throttler.erl index 008bd16633e..928580e2bd9 100644 --- a/apps/emqx/src/emqx_log_throttler.erl +++ b/apps/emqx/src/emqx_log_throttler.erl @@ -49,8 +49,15 @@ -define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])). -define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))). --spec allow(atom(), any()) -> boolean(). -allow(Msg, UniqueKey) when is_atom(Msg) -> +%% @doc Check if a throttled log message is allowed to pass down to the logger this time. +%% The Msg has to be an atom, and the second argument `UniqueKey' should be `undefined' +%% for predefined message IDs. +%% For relatively static resources created from configurations such as data integration +%% resource IDs `UniqueKey' should be of `binary()' type. +-spec allow(atom(), undefined | binary()) -> boolean(). +allow(Msg, UniqueKey) when + is_atom(Msg) andalso (is_binary(UniqueKey) orelse UniqueKey =:= undefined) +-> case emqx_logger:get_primary_log_level() of debug -> true; @@ -68,7 +75,7 @@ start_link() -> init([]) -> process_flag(trap_exit, true), - ok = lists:foreach(fun(Msg) -> new_throttler(Msg) end, ?MSGS_LIST), + ok = lists:foreach(fun new_throttler/1, ?MSGS_LIST), CurrentPeriodMs = ?TIME_WINDOW_MS, TimerRef = schedule_refresh(CurrentPeriodMs), {ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}. @@ -86,15 +93,16 @@ handle_info(refresh, #{current_period_ms := PeriodMs} = State) -> DroppedStats = lists:foldl( fun(Msg, Acc) -> case ?GET_SEQ(Msg) of - %% Should not happen, unless the static ids list is updated at run-time. undefined -> + %% Should not happen, unless the static ids list is updated at run-time. new_throttler(Msg), ?tp(log_throttler_new_msg, #{throttled_msg => Msg}), Acc; SeqMap when is_map(SeqMap) -> maps:fold( fun(Key, Ref, Acc0) -> - drop_stats(Ref, emqx_utils:format("~ts:~s", [Msg, Key]), Acc0) + ID = iolist_to_binary([atom_to_binary(Msg), $:, Key]), + drop_stats(Ref, ID, Acc0) end, Acc, SeqMap @@ -124,27 +132,9 @@ drop_stats(SeqRef, Msg, Acc) -> maybe_add_dropped(Msg, Dropped, Acc). terminate(_Reason, _State) -> - lists:foreach( - fun(Msg) -> - case ?GET_SEQ(Msg) of - undefined -> - ok; - SeqMap when is_map(SeqMap) -> - maps:foreach( - fun(_, Ref) -> - ok = ?RESET_SEQ(Ref) - end, - SeqMap - ); - SeqRef -> - %% atomics don't have erase API... - %% (if nobody hold the ref, the atomics should erase automatically?) - ok = ?RESET_SEQ(SeqRef) - end, - ?ERASE_SEQ(Msg) - end, - ?MSGS_LIST - ), + %% atomics do not have delete/remove/release/deallocate API + %% after the reference is garbage-collected the resource is released + lists:foreach(fun(Msg) -> ?ERASE_SEQ(Msg) end, ?MSGS_LIST), ok. code_change(_OldVsn, State, _Extra) -> @@ -198,9 +188,9 @@ schedule_refresh(PeriodMs) -> erlang:send_after(PeriodMs, ?MODULE, refresh). new_throttler(unrecoverable_resource_error = Msg) -> - persistent_term:put(?SEQ_ID(Msg), #{}); + new_throttler(Msg, #{}); new_throttler(Msg) -> - persistent_term:put(?SEQ_ID(Msg), ?NEW_SEQ). + new_throttler(Msg, ?NEW_SEQ). -new_throttler(Msg, Map) -> - persistent_term:put(?SEQ_ID(Msg), Map). +new_throttler(Msg, AtomicOrEmptyMap) -> + persistent_term:put(?SEQ_ID(Msg), AtomicOrEmptyMap). diff --git a/apps/emqx/test/emqx_log_throttler_SUITE.erl b/apps/emqx/test/emqx_log_throttler_SUITE.erl index 23150a3b1b5..f95d6296932 100644 --- a/apps/emqx/test/emqx_log_throttler_SUITE.erl +++ b/apps/emqx/test/emqx_log_throttler_SUITE.erl @@ -127,7 +127,7 @@ t_throttle(_Config) -> t_throttle_recoverable_msg(_Config) -> ResourceId = <<"resource_id">>, - ThrottledMsg = emqx_utils:format("~ts:~s", [?THROTTLE_UNRECOVERABLE_MSG, ResourceId]), + ThrottledMsg = iolist_to_binary([atom_to_list(?THROTTLE_UNRECOVERABLE_MSG), ":", ResourceId]), ?check_trace( begin %% Warm-up and block to increase the probability that next events @@ -181,10 +181,15 @@ t_throttle_add_new_msg(_Config) -> t_throttle_no_msg(_Config) -> %% Must simply pass with no crashes + Pid = erlang:whereis(emqx_log_throttler), ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)), ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)), - timer:sleep(10), - ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))). + %% assert process is not restarted + ?assertEqual(Pid, erlang:whereis(emqx_log_throttler)), + %% make a gen_call to ensure the process is alive + %% note: this call result in an 'unexpected_call' error log. + ?assertEqual(ignored, gen_server:call(Pid, probe)), + ok. t_update_time_window(_Config) -> ?check_trace( From eab440e0c16c41d3e8fe684f0db66455fb9830b1 Mon Sep 17 00:00:00 2001 From: zmstone Date: Fri, 26 Jul 2024 15:19:10 +0200 Subject: [PATCH 15/19] docs: add changelog for PR 13528 --- changes/ce/feat-13528.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ce/feat-13528.en.md diff --git a/changes/ce/feat-13528.en.md b/changes/ce/feat-13528.en.md new file mode 100644 index 00000000000..f761e9565f5 --- /dev/null +++ b/changes/ce/feat-13528.en.md @@ -0,0 +1 @@ +Add log throttling for data integration unrecoverable errors. From a49cd78aae3b85c8d97bf64420db3013e100c4db Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 29 Jul 2024 23:54:00 +0200 Subject: [PATCH 16/19] refactor: force getenv to access only OS env with prefix EMQXVAR_ --- apps/emqx_utils/src/emqx_variform_bif.erl | 2 +- apps/emqx_utils/test/emqx_variform_bif_tests.erl | 2 +- changes/ce/feat-13507.en.md | 6 ++++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/apps/emqx_utils/src/emqx_variform_bif.erl b/apps/emqx_utils/src/emqx_variform_bif.erl index e66b8e47dd1..09048a69755 100644 --- a/apps/emqx_utils/src/emqx_variform_bif.erl +++ b/apps/emqx_utils/src/emqx_variform_bif.erl @@ -583,7 +583,7 @@ getenv(Bin) when is_binary(Bin) -> EnvKey = ?ENV_CACHE(Bin), case persistent_term:get(EnvKey, undefined) of undefined -> - Name = erlang:binary_to_list(Bin), + Name = "EMQXVAR_" ++ erlang:binary_to_list(Bin), Result = case os:getenv(Name) of false -> diff --git a/apps/emqx_utils/test/emqx_variform_bif_tests.erl b/apps/emqx_utils/test/emqx_variform_bif_tests.erl index 36235be40d2..aa6724de5b8 100644 --- a/apps/emqx_utils/test/emqx_variform_bif_tests.erl +++ b/apps/emqx_utils/test/emqx_variform_bif_tests.erl @@ -77,5 +77,5 @@ system_test() -> EnvName = erlang:atom_to_list(?MODULE), EnvVal = erlang:atom_to_list(?FUNCTION_NAME), EnvNameBin = erlang:list_to_binary(EnvName), - os:putenv(EnvName, EnvVal), + os:putenv("EMQXVAR_" ++ EnvName, EnvVal), ?assertEqual(erlang:list_to_binary(EnvVal), emqx_variform_bif:getenv(EnvNameBin)). diff --git a/changes/ce/feat-13507.en.md b/changes/ce/feat-13507.en.md index 115fa49a975..026cf6bf478 100644 --- a/changes/ce/feat-13507.en.md +++ b/changes/ce/feat-13507.en.md @@ -1,2 +1,4 @@ -Added a new builtin function `getenv` in the rule engine and variform expression to access the environment variables. -Note this value is immutable once loaded from the environment. +Added a new builtin function `getenv` in the rule engine and variform expression to access the environment variables with below limitations. + +- Prefix `EMQXVAR_` is added before reading from OS environment variables. i.e. `getenv('FOO_BAR')` is to read `EMQXVAR_FOO_BAR`. +- The values are immutable once loaded from the OS environment. From 3d1f0c756cd8a63412858c2b631b7879fba5ccc1 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 29 Jul 2024 15:29:01 +0800 Subject: [PATCH 17/19] feat: call plugin's app module `on_config_changed/2` callback assume the module: `[PluginName]_app` --- apps/emqx_plugins/src/emqx_plugins.erl | 30 ++++++++++++++++++++++++-- changes/ce/feat-13548.en.md | 6 ++++++ 2 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 changes/ce/feat-13548.en.md diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 253d1b4ef91..56a88d7cbc5 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -356,13 +356,13 @@ get_config_bin(NameVsn) -> %% RPC call from Management API or CLI. %% The plugin config Json Map was valid by avro schema %% Or: if no and plugin config ALWAYS be valid before calling this function. -put_config(NameVsn, ConfigJsonMap, AvroValue) when not is_binary(NameVsn) -> +put_config(NameVsn, ConfigJsonMap, AvroValue) when (not is_binary(NameVsn)) -> put_config(bin(NameVsn), ConfigJsonMap, AvroValue); put_config(NameVsn, ConfigJsonMap, _AvroValue) -> HoconBin = hocon_pp:do(ConfigJsonMap, #{}), ok = backup_and_write_hocon_bin(NameVsn, HoconBin), - %% TODO: callback in plugin's on_config_changed (config update by mgmt API) %% TODO: callback in plugin's on_config_upgraded (config vsn upgrade v1 -> v2) + ok = maybe_call_on_config_changed(NameVsn, ConfigJsonMap), ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), ConfigJsonMap), ok. @@ -373,6 +373,32 @@ restart(NameVsn) -> {error, Reason} -> {error, Reason} end. +%% @doc Call plugin's callback on_config_changed/2 +maybe_call_on_config_changed(NameVsn, NewConf) -> + FuncName = on_config_changed, + maybe + {ok, PluginAppModule} ?= app_module_name(NameVsn), + true ?= erlang:function_exported(PluginAppModule, FuncName, 2), + {ok, OldConf} = get_config(NameVsn), + _ = erlang:apply(PluginAppModule, FuncName, [OldConf, NewConf]) + else + {error, Reason} -> + ?SLOG(info, #{msg => "failed_to_call_on_config_changed", reason => Reason}); + false -> + ?SLOG(info, #{msg => "on_config_changed_callback_not_exported"}); + _ -> + ok + end. + +app_module_name(NameVsn) -> + case read_plugin_info(NameVsn, #{}) of + {ok, #{<<"name">> := Name} = _PluginInfo} -> + emqx_utils:safe_to_existing_atom(<>); + {error, Reason} -> + ?SLOG(error, Reason#{msg => "failed_to_read_plugin_info"}), + {error, Reason} + end. + %% @doc List all installed plugins. %% Including the ones that are installed, but not enabled in config. -spec list() -> [plugin_info()]. diff --git a/changes/ce/feat-13548.en.md b/changes/ce/feat-13548.en.md new file mode 100644 index 00000000000..75b56cd4323 --- /dev/null +++ b/changes/ce/feat-13548.en.md @@ -0,0 +1,6 @@ +Optionally calls the `on_config_changed/2` callback function when the plugin configuration is updated via the REST API. + +This callback function is assumed to be exported by the `_app` module. +i.e: +Plugin NameVsn: `my_plugin-1.0.0` +This callback function is assumed to be `my_plugin_app:on_config_changed/2` From e6bfc14cc9f8f7c792dc55c96fe81e38f9acb237 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 31 Jul 2024 09:26:44 +0800 Subject: [PATCH 18/19] fix: try-catch optional `on_config_changed/2` plugin app callback --- apps/emqx_plugins/src/emqx_plugins.erl | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 56a88d7cbc5..94c6aa4e138 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -380,7 +380,18 @@ maybe_call_on_config_changed(NameVsn, NewConf) -> {ok, PluginAppModule} ?= app_module_name(NameVsn), true ?= erlang:function_exported(PluginAppModule, FuncName, 2), {ok, OldConf} = get_config(NameVsn), - _ = erlang:apply(PluginAppModule, FuncName, [OldConf, NewConf]) + try erlang:apply(PluginAppModule, FuncName, [OldConf, NewConf]) of + _ -> ok + catch + Class:CatchReason:Stacktrace -> + ?SLOG(error, #{ + msg => "failed_to_call_on_config_changed", + exception => Class, + reason => CatchReason, + stacktrace => Stacktrace + }), + ok + end else {error, Reason} -> ?SLOG(info, #{msg => "failed_to_call_on_config_changed", reason => Reason}); From 85cff5e7ebf7c25d2d4298995a5dd2c614cd333b Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 31 Jul 2024 09:14:29 -0300 Subject: [PATCH 19/19] fix: merge conflicts --- .../src/emqx_bridge_kafka_impl_producer.erl | 12 +++++++----- .../test/emqx_bridge_v2_kafka_producer_SUITE.erl | 15 +++++++++++++++ 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index c886b8d5865..1b18a1767a8 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -564,7 +564,7 @@ on_kafka_ack(_Partition, message_too_large, {ReplyFn, Args}) -> %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise, %% `emqx_resource_manager' will kill the wolff producers and messages might be lost. on_get_status( - _InstId, + ConnResId, #{client_id := ClientId} = State ) -> %% Note: we must avoid returning `?status_disconnected' here if the connector ever was @@ -574,7 +574,7 @@ on_get_status( %% held in wolff producer's replayq. case check_client_connectivity(ClientId) of ok -> - maybe_check_health_check_topic(State); + maybe_check_health_check_topic(ConnResId, State); {error, {find_client, _Error}} -> ?status_connecting; {error, {connectivity, Error}} -> @@ -648,21 +648,23 @@ check_client_connectivity(ClientId) -> {error, {find_client, Reason}} end. -maybe_check_health_check_topic(#{health_check_topic := Topic} = ConnectorState) when +maybe_check_health_check_topic(ConnResId, #{health_check_topic := Topic} = ConnectorState) when is_binary(Topic) -> #{client_id := ClientId} = ConnectorState, MaxPartitions = all_partitions, - try check_topic_and_leader_connections(ClientId, Topic, MaxPartitions) of + try check_topic_and_leader_connections(ConnResId, ClientId, Topic, MaxPartitions) of ok -> ?status_connected catch + throw:{unhealthy_target, Msg} -> + {?status_disconnected, ConnectorState, Msg}; throw:#{reason := {connection_down, _} = Reason} -> {?status_disconnected, ConnectorState, Reason}; throw:#{reason := Reason} -> {?status_connecting, ConnectorState, Reason} end; -maybe_check_health_check_topic(_) -> +maybe_check_health_check_topic(_ConnResId, _ConnState) -> %% Cannot infer further information. Maybe upgraded from older version. ?status_connected. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 5e9d53fc5fc..1db3c17252a 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -740,6 +740,21 @@ t_connector_health_check_topic(_Config) -> emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConfig1) ), + %% By providing an inexistent health check topic, we should detect it's + %% disconnected without the need for an action. + ConnectorConfig2 = connector_config(#{ + <<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()), + <<"health_check_topic">> => <<"i-dont-exist-999">> + }), + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := <<"Unknown topic or partition", _/binary>> + }}}, + emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConfig2) + ), + ok end, []