From f55cd21e52b292c6d1c78451858abe077357006d Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 29 Oct 2024 11:22:32 +0100 Subject: [PATCH] Add AMQP 1.0 event exchange test --- deps/rabbitmq_event_exchange/Makefile | 2 +- .../test/system_SUITE.erl | 104 ++++++++++++++---- 2 files changed, 83 insertions(+), 23 deletions(-) diff --git a/deps/rabbitmq_event_exchange/Makefile b/deps/rabbitmq_event_exchange/Makefile index f1f5ff81d952..fdac1be67e6e 100644 --- a/deps/rabbitmq_event_exchange/Makefile +++ b/deps/rabbitmq_event_exchange/Makefile @@ -6,7 +6,7 @@ define PROJECT_APP_EXTRA_KEYS endef DEPS = rabbit_common rabbit -TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client +TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client rabbitmq_amqp_client DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk diff --git a/deps/rabbitmq_event_exchange/test/system_SUITE.erl b/deps/rabbitmq_event_exchange/test/system_SUITE.erl index 76d9199a586c..4610378131ea 100644 --- a/deps/rabbitmq_event_exchange/test/system_SUITE.erl +++ b/deps/rabbitmq_event_exchange/test/system_SUITE.erl @@ -17,28 +17,41 @@ all() -> [ - queue_created, - authentication, - audit_queue, - audit_exchange, - audit_exchange_internal_parameter, - audit_binding, - audit_vhost, - audit_vhost_deletion, - audit_channel, - audit_connection, - audit_direct_connection, - audit_consumer, - audit_parameter, - audit_policy, - audit_vhost_limit, - audit_user, - audit_user_password, - audit_user_tags, - audit_permission, - audit_topic_permission, - resource_alarm, - unregister + {group, amqp}, + {group, amqpl} + ]. + +groups() -> + [ + {amqp, [shuffle], + [ + amqp_connection + ]}, + {amqpl, [], + [ + queue_created, + authentication, + audit_queue, + audit_exchange, + audit_exchange_internal_parameter, + audit_binding, + audit_vhost, + audit_vhost_deletion, + audit_channel, + audit_connection, + audit_direct_connection, + audit_consumer, + audit_parameter, + audit_policy, + audit_vhost_limit, + audit_user, + audit_user_password, + audit_user_tags, + audit_permission, + audit_topic_permission, + resource_alarm, + unregister + ]} ]. %% ------------------------------------------------------------------- @@ -60,6 +73,9 @@ end_per_suite(Config) -> rabbit_ct_client_helpers:teardown_steps() ++ rabbit_ct_broker_helpers:teardown_steps()). +init_per_group(amqp, Config) -> + {ok, _} = application:ensure_all_started(rabbitmq_amqp_client), + Config; init_per_group(_, Config) -> Config. @@ -453,6 +469,35 @@ unregister(Config) -> lookup, [X])), ok. +%% Test that the event exchange works when publising and consuming via AMQP 1.0. +amqp_connection(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(QName), + {Connection1, Session, LinkPair} = amqp_init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName,#{}), + ok = rabbitmq_amqp_client:bind_queue( + LinkPair, QName, <<"amq.rabbitmq.event">>, <<"connection.*">>, #{}), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, settled), + + OpnConf0 = amqp_connection_config(Config), + OpnConf = maps:update(container_id, <<"2nd container">>, OpnConf0), + {ok, Connection2} = amqp10_client:open_connection(OpnConf), + receive {amqp10_event, {connection, Connection2, opened}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + {ok, Msg} = amqp10_client:get_msg(Receiver), + ?assertMatch(#{<<"x-routing-key">> := <<"connection.created">>}, + amqp10_msg:message_annotations(Msg)), + ?assertMatch(#{<<"container_id">> := <<"2nd container">>}, + amqp10_msg:application_properties(Msg)), + ok = amqp10_client:close_connection(Connection2), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection1). + %% ------------------------------------------------------------------- %% Helpers %% ------------------------------------------------------------------- @@ -494,3 +539,18 @@ receive_event(Event) -> 60000 -> throw({receive_event_timeout, Event}) end. + +amqp_init(Config) -> + OpnConf = amqp_connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), + {Connection, Session, LinkPair}. + +amqp_connection_config(Config) -> + Host = proplists:get_value(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + #{address => Host, + port => Port, + container_id => <<"my container">>, + sasl => {plain, <<"guest">>, <<"guest">>}}.