Skip to content

Commit

Permalink
Add AMQP 1.0 event exchange test
Browse files Browse the repository at this point in the history
  • Loading branch information
ansd committed Oct 29, 2024
1 parent 3e57a38 commit f55cd21
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 23 deletions.
2 changes: 1 addition & 1 deletion deps/rabbitmq_event_exchange/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ define PROJECT_APP_EXTRA_KEYS
endef

DEPS = rabbit_common rabbit
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client rabbitmq_amqp_client

DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk
DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk
Expand Down
104 changes: 82 additions & 22 deletions deps/rabbitmq_event_exchange/test/system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,41 @@

all() ->
[
queue_created,
authentication,
audit_queue,
audit_exchange,
audit_exchange_internal_parameter,
audit_binding,
audit_vhost,
audit_vhost_deletion,
audit_channel,
audit_connection,
audit_direct_connection,
audit_consumer,
audit_parameter,
audit_policy,
audit_vhost_limit,
audit_user,
audit_user_password,
audit_user_tags,
audit_permission,
audit_topic_permission,
resource_alarm,
unregister
{group, amqp},
{group, amqpl}
].

groups() ->
[
{amqp, [shuffle],
[
amqp_connection
]},
{amqpl, [],
[
queue_created,
authentication,
audit_queue,
audit_exchange,
audit_exchange_internal_parameter,
audit_binding,
audit_vhost,
audit_vhost_deletion,
audit_channel,
audit_connection,
audit_direct_connection,
audit_consumer,
audit_parameter,
audit_policy,
audit_vhost_limit,
audit_user,
audit_user_password,
audit_user_tags,
audit_permission,
audit_topic_permission,
resource_alarm,
unregister
]}
].

%% -------------------------------------------------------------------
Expand All @@ -60,6 +73,9 @@ end_per_suite(Config) ->
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).

init_per_group(amqp, Config) ->
{ok, _} = application:ensure_all_started(rabbitmq_amqp_client),
Config;
init_per_group(_, Config) ->
Config.

Expand Down Expand Up @@ -453,6 +469,35 @@ unregister(Config) ->
lookup, [X])),
ok.

%% Test that the event exchange works when publising and consuming via AMQP 1.0.
amqp_connection(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(QName),
{Connection1, Session, LinkPair} = amqp_init(Config),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName,#{}),
ok = rabbitmq_amqp_client:bind_queue(
LinkPair, QName, <<"amq.rabbitmq.event">>, <<"connection.*">>, #{}),
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"receiver">>, Address, settled),

OpnConf0 = amqp_connection_config(Config),
OpnConf = maps:update(container_id, <<"2nd container">>, OpnConf0),
{ok, Connection2} = amqp10_client:open_connection(OpnConf),
receive {amqp10_event, {connection, Connection2, opened}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
end,
{ok, Msg} = amqp10_client:get_msg(Receiver),
?assertMatch(#{<<"x-routing-key">> := <<"connection.created">>},
amqp10_msg:message_annotations(Msg)),
?assertMatch(#{<<"container_id">> := <<"2nd container">>},
amqp10_msg:application_properties(Msg)),
ok = amqp10_client:close_connection(Connection2),

{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection1).

%% -------------------------------------------------------------------
%% Helpers
%% -------------------------------------------------------------------
Expand Down Expand Up @@ -494,3 +539,18 @@ receive_event(Event) ->
60000 ->
throw({receive_event_timeout, Event})
end.

amqp_init(Config) ->
OpnConf = amqp_connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>),
{Connection, Session, LinkPair}.

amqp_connection_config(Config) ->
Host = proplists:get_value(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
#{address => Host,
port => Port,
container_id => <<"my container">>,
sasl => {plain, <<"guest">>, <<"guest">>}}.

0 comments on commit f55cd21

Please sign in to comment.