Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/release-58' into sync-release-58…
Browse files Browse the repository at this point in the history
…-20241029-201938
  • Loading branch information
thalesmg committed Oct 29, 2024
2 parents 1677f5c + 144c48a commit 1f9e232
Show file tree
Hide file tree
Showing 31 changed files with 195 additions and 105 deletions.
2 changes: 1 addition & 1 deletion apps/emqx_audit/src/emqx_audit.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, emqx_audit, [
{description, "Audit log for EMQX"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{mod, {emqx_audit_app, []}},
{applications, [kernel, stdlib, emqx]},
Expand Down
13 changes: 9 additions & 4 deletions apps/emqx_audit/src/emqx_audit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

-define(FILTER_REQ, [cert, host_info, has_sent_resp, pid, path_info, peer, ref, sock, streamid]).

-define(CHARS_LIMIT_IN_DB, 1024).

-ifdef(TEST).
-define(INTERVAL, 100).
-else.
Expand All @@ -38,8 +40,8 @@
to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) ->
#?AUDIT{
operation_id = <<"">>,
operation_type = atom_to_binary(Cmd),
args = Args,
operation_type = truncate_large_term(Cmd),
args = truncate_large_term(Args),
operation_result = <<"">>,
failure = <<"">>,
duration_ms = DurationMs,
Expand All @@ -65,7 +67,7 @@ to_audit(#{from := erlang_console, function := F, args := Args}) ->
http_method = <<"">>,
http_request = <<"">>,
duration_ms = 0,
args = iolist_to_binary(io_lib:format("~p: ~ts", [F, Args]))
args = truncate_large_term({F, Args})
};
to_audit(#{from := From} = Log) when is_atom(From) ->
#{
Expand Down Expand Up @@ -93,7 +95,7 @@ to_audit(#{from := From} = Log) when is_atom(From) ->
%% request detail
http_status_code = StatusCode,
http_method = Method,
http_request = Request,
http_request = truncate_large_term(Request),
duration_ms = DurationMs,
args = <<"">>
}.
Expand Down Expand Up @@ -243,3 +245,6 @@ log_to_file(Level, Meta, #{module := Module} = Handler) ->
)
end
end.

truncate_large_term(Req) ->
unicode:characters_to_binary(io_lib:format("~0p", [Req], [{chars_limit, ?CHARS_LIMIT_IN_DB}])).
11 changes: 3 additions & 8 deletions apps/emqx_audit/test/emqx_audit_api_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,7 @@ t_http_api(_) ->
<<"operation_id">> := <<"/configs/global_zone">>,
<<"source_ip">> := <<"127.0.0.1">>,
<<"source">> := _,
<<"http_request">> := #{
<<"method">> := <<"put">>,
<<"body">> := #{<<"mqtt">> := #{<<"max_qos_allowed">> := 1}},
<<"bindings">> := _,
<<"headers">> := #{<<"authorization">> := <<"******">>}
},
<<"http_request">> := _,
<<"http_status_code">> := 200,
<<"operation_result">> := <<"success">>,
<<"operation_type">> := <<"configs">>
Expand Down Expand Up @@ -166,7 +161,7 @@ t_cli(_Config) ->
<<"operation_id">> := <<"">>,
<<"source_ip">> := <<"">>,
<<"operation_type">> := <<"conf">>,
<<"args">> := [<<"show">>, <<"log">>],
<<"args">> := <<"[<<\"show\">>,<<\"log\">>]">>,
<<"node">> := _,
<<"source">> := <<"">>,
<<"http_request">> := <<"">>
Expand All @@ -184,7 +179,7 @@ t_cli(_Config) ->
{ok, Res1} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "from=cli", AuthHeader),
#{<<"data">> := Data1} = emqx_utils_json:decode(Res1, [return_maps]),
?assertMatch(
[ShowLogEntry, #{<<"operation_type">> := <<"emqx">>, <<"args">> := [<<"start">>]}],
[ShowLogEntry, #{<<"operation_type">> := <<"emqx">>, <<"args">> := <<"[<<\"start\">>]">>}],
Data1
),
{ok, Res2} = emqx_mgmt_api_test_util:request_api(
Expand Down
13 changes: 12 additions & 1 deletion apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

-define(ROOT_KEY_ACTIONS, actions).
-define(ROOT_KEY_SOURCES, sources).
-define(tpal(MSG), begin
ct:pal(MSG),
?tp(notice, MSG, #{})
end).

%% ct setup helpers

Expand Down Expand Up @@ -946,7 +950,9 @@ t_consume(Config, Opts) ->
check_fn := CheckFn,
produce_tracepoint := TracePointFn
} = Opts,
TestTimeout = maps:get(test_timeout, Opts, 60_000),
?check_trace(
#{timetrap => TestTimeout},
begin
ConsumerReadyTimeout = maps:get(consumer_ready_timeout, Opts, 15_000),
case ConsumerReadyTPFn of
Expand All @@ -957,9 +963,12 @@ t_consume(Config, Opts) ->
Predicate, _NEvents = 1, ConsumerReadyTimeout
)
end,
?tpal("creating connector and source"),
?assertMatch({ok, _}, create_bridge_api(Config)),
?assertMatch({ok, _}, snabbkaffe:receive_events(SRef0)),
?tpal("adding hookpoint"),
ok = add_source_hookpoint(Config),
?tpal("waiting until connected"),
?retry(
_Sleep = 200,
_Attempts = 20,
Expand All @@ -968,14 +977,16 @@ t_consume(Config, Opts) ->
health_check_channel(Config)
)
),
?tpal("producing message and waiting for it to be consumed"),
?assertMatch(
{_, {ok, _}},
snabbkaffe:wait_async_action(
ProduceFn,
TracePointFn,
15_000
infinity
)
),
?tpal("waiting for consumed message"),
receive
{consumed_message, Message} ->
CheckFn(Message)
Expand Down
6 changes: 3 additions & 3 deletions apps/emqx_bridge_azure_event_hub/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
{erl_opts, [debug_info]}.
{deps, [
{wolff, "4.0.3"},
{kafka_protocol, "4.1.8"},
{kafka_protocol, "4.1.9"},
{brod_gssapi, "0.1.3"},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
{snappyer, "1.2.9"},
{brod, "4.3.1"},
{snappyer, "1.2.10"},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
Expand Down
6 changes: 3 additions & 3 deletions apps/emqx_bridge_confluent/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
{erl_opts, [debug_info]}.
{deps, [
{wolff, "4.0.3"},
{kafka_protocol, "4.1.8"},
{kafka_protocol, "4.1.9"},
{brod_gssapi, "0.1.3"},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
{snappyer, "1.2.9"},
{brod, "4.3.1"},
{snappyer, "1.2.10"},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,9 @@ t_bad_timestamp(Config) ->
[
#{
error := [
{error, {bad_timestamp, <<"bad_timestamp">>}}
{error,
{bad_timestamp,
{non_integer_timestamp, <<"bad_timestamp">>}}}
]
}
],
Expand All @@ -883,7 +885,7 @@ t_bad_timestamp(Config) ->
{sync, false} ->
?assertEqual(
{error, [
{error, {bad_timestamp, <<"bad_timestamp">>}}
{error, {bad_timestamp, {non_integer_timestamp, <<"bad_timestamp">>}}}
]},
Return
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,8 +714,13 @@ parse_timestamp([TsBin]) ->
{ok, binary_to_integer(TsBin)}
catch
_:_ ->
{error, TsBin}
end.
{error, {non_integer_timestamp, TsBin}}
end;
parse_timestamp(InvalidTs) ->
%% The timestamp field must be a single integer or a single placeholder. i.e. the
%% following is not allowed:
%% - weather,location=us-midwest,season=summer temperature=82 ${timestamp}00
{error, {unsupported_placeholder_usage_for_timestamp, InvalidTs}}.

continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) ->
case line_to_point(Data, Item) of
Expand Down
12 changes: 8 additions & 4 deletions apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,10 @@ t_tag_set_use_literal_value(Config) ->
?assertEqual(TsStr, TimeReturned).

t_bad_timestamp(Config) ->
test_bad_timestamp(Config, <<"bad_timestamp">>, non_integer_timestamp),
test_bad_timestamp(Config, <<"${timestamp}000">>, unsupported_placeholder_usage_for_timestamp).

test_bad_timestamp(Config, Timestamp, ErrTag) ->
InfluxDBType = ?config(influxdb_type, Config),
InfluxDBName = ?config(influxdb_name, Config),
QueryMode = ?config(query_mode, Config),
Expand All @@ -929,7 +933,7 @@ t_bad_timestamp(Config) ->
%% N.B.: this single space characters are relevant
<<"${topic}", " ", "payload=${payload},", "${clientid}_int_value=${payload.int_key}i,",
"uint_value=${payload.uint_key}u,"
"bool=${payload.bool}", " ", "bad_timestamp">>,
"bool=${payload.bool}", " ", Timestamp/binary>>,
%% append this to override the config
InfluxDBConfigString1 =
io_lib:format(
Expand Down Expand Up @@ -983,16 +987,16 @@ t_bad_timestamp(Config) ->
[
#{
error := [
{error, {bad_timestamp, <<"bad_timestamp">>}}
{error, {bad_timestamp, {ErrTag, _}}}
]
}
],
?of_kind(influxdb_connector_send_query_error, Trace)
);
{sync, false} ->
?assertEqual(
?assertMatch(
{error, [
{error, {bad_timestamp, <<"bad_timestamp">>}}
{error, {bad_timestamp, {ErrTag, _}}}
]},
Return
);
Expand Down
6 changes: 3 additions & 3 deletions apps/emqx_bridge_kafka/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
{erl_opts, [debug_info]}.
{deps, [
{wolff, "4.0.3"},
{kafka_protocol, "4.1.8"},
{kafka_protocol, "4.1.9"},
{brod_gssapi, "0.1.3"},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
{snappyer, "1.2.9"},
{brod, "4.3.1"},
{snappyer, "1.2.10"},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
{Params1, V1Config4} = maps:take(<<"parameters">>, V1Config3),
TopLevelCfgKeys = [to_bin(K) || {K, _} <- emqx_bridge_kafka:fields(consumer_opts), K =/= kafka],
TopLevelCfg = maps:with(TopLevelCfgKeys, Params1),
%% `topic' is v2-only
Params = maps:without([<<"topic">> | TopLevelCfgKeys], Params1),
Params = maps:with(v1_source_parameters(), Params1),
V1Config5 = emqx_utils_maps:deep_merge(V1Config4, TopLevelCfg),
V1Config = emqx_utils_maps:update_if_present(
<<"resource_opts">>,
Expand All @@ -64,6 +63,14 @@ bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
%% Internal helper functions
%%------------------------------------------------------------------------------------------

v1_source_parameters() ->
[
<<"max_batch_bytes">>,
<<"max_rejoin_attempts">>,
<<"offset_commit_interval_seconds">>,
<<"offset_reset_policy">>
].

%% The new schema has a single kafka topic, so we take it from topic mapping when
%% converting from v1.
maybe_set_kafka_topic(#{<<"topic_mapping">> := [#{<<"kafka_topic">> := Topic} | _]} = Params) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ fields(source_parameters) ->
required => false,
desc => ?DESC(group_id)
}
)},
{max_wait_time,
mk(
emqx_schema:timeout_duration_ms(),
#{
default => <<"1s">>,
desc => ?DESC("max_wait_time")
}
)}
| Fields
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
group_id => binary(),
key_encoding_mode := encoding_mode(),
max_batch_bytes := emqx_schema:bytesize(),
max_wait_time := non_neg_integer(),
max_rejoin_attempts := non_neg_integer(),
offset_commit_interval_seconds := pos_integer(),
offset_reset_policy := offset_reset_policy(),
Expand Down Expand Up @@ -414,6 +415,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
parameters := #{
key_encoding_mode := KeyEncodingMode,
max_batch_bytes := MaxBatchBytes,
max_wait_time := MaxWaitTime,
max_rejoin_attempts := MaxRejoinAttempts,
offset_commit_interval_seconds := OffsetCommitInterval,
offset_reset_policy := OffsetResetPolicy0,
Expand Down Expand Up @@ -445,6 +447,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
ConsumerConfig = [
{begin_offset, BeginOffset},
{max_bytes, MaxBatchBytes},
{max_wait_time, MaxWaitTime},
{offset_reset_policy, OffsetResetPolicy}
],
GroupConfig = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
-define(BRIDGE_TYPE_BIN, <<"kafka_consumer">>).
-define(CONNECTOR_TYPE_BIN, <<"kafka_consumer">>).
-define(SOURCE_TYPE_BIN, <<"kafka_consumer">>).
-define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_kafka]).

%%------------------------------------------------------------------------------
%% CT boilerplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ source_config(Overrides0) ->
#{
<<"key_encoding_mode">> => <<"none">>,
<<"max_batch_bytes">> => <<"896KB">>,
<<"max_wait_time">> => <<"500ms">>,
<<"max_rejoin_attempts">> => <<"5">>,
<<"offset_reset_policy">> => <<"latest">>,
<<"offset_reset_policy">> => <<"earliest">>,
<<"topic">> => <<"please override">>,
<<"value_encoding_mode">> => <<"none">>
},
Expand All @@ -217,7 +218,7 @@ source_config(Overrides0) ->
<<"resume_interval">> => <<"2s">>
}
},
maps:merge(CommonConfig, Overrides).
emqx_utils_maps:deep_merge(CommonConfig, Overrides).

%%------------------------------------------------------------------------------
%% Testcases
Expand Down Expand Up @@ -279,6 +280,7 @@ t_consume(Config) ->
ok = emqx_bridge_v2_testlib:t_consume(
Config,
#{
test_timeout => timer:seconds(20),
consumer_ready_tracepoint => ?match_n_events(
NumPartitions,
#{?snk_kind := kafka_consumer_subscriber_init}
Expand Down
11 changes: 8 additions & 3 deletions apps/emqx_dashboard_sso/src/emqx_dashboard_sso_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,18 @@ login(post, #{bindings := #{backend := Backend}, body := Body} = Request) ->
request => emqx_utils:redact(Request)
}),
Redirect;
{error, Reason} ->
{error, Reason0} ->
Reason = emqx_utils:redact(Reason0),
?SLOG(info, #{
msg => "dashboard_sso_login_failed",
request => emqx_utils:redact(Request),
reason => emqx_utils:redact(Reason)
reason => Reason
}),
{401, #{code => ?BAD_USERNAME_OR_PWD, message => <<"Auth failed">>}}
{401, #{
code => ?BAD_USERNAME_OR_PWD,
message => <<"Auth failed">>,
reason => Reason
}}
end
end.

Expand Down
Loading

0 comments on commit 1f9e232

Please sign in to comment.