diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl index 4ba57f9c2c11..1c3d182020de 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl @@ -750,6 +750,9 @@ info(Pid, InfoItems) -> UnknownItems -> throw({bad_argument, UnknownItems}) end. +info_internal(pid, #v1{}) -> self(); +info_internal(connection, #v1{connection = Val}) -> + Val; info_internal(node, #v1{}) -> node(); info_internal(auth_mechanism, #v1{connection = #v1_connection{auth_mechanism = none}}) -> none; @@ -764,11 +767,11 @@ info_internal(frame_max, #v1{connection = #v1_connection{frame_max = Val}}) -> info_internal(timeout, #v1{connection = #v1_connection{timeout_sec = Val}}) -> Val; info_internal(user, - #v1{connection = #v1_connection{user = #user{username = none}}}) -> - ''; -info_internal(username, - #v1{connection = #v1_connection{user = #user{username = Val}}}) -> + #v1{connection = #v1_connection{user = {user, Val, _, _}}}) -> Val; +info_internal(user, + #v1{connection = #v1_connection{user = none}}) -> + ''; info_internal(state, #v1{connection_state = Val}) -> Val; info_internal(SockStat, S) when SockStat =:= recv_oct; diff --git a/deps/rabbitmq_amqp1_0/test/command_SUITE.erl b/deps/rabbitmq_amqp1_0/test/command_SUITE.erl index 974f24cbed97..4ffac6ac14fa 100644 --- a/deps/rabbitmq_amqp1_0/test/command_SUITE.erl +++ b/deps/rabbitmq_amqp1_0/test/command_SUITE.erl @@ -90,53 +90,15 @@ when_one_connection(_Config) -> [A] = rabbit_ct_broker_helpers:get_node_configs(_Config, nodename), Opts = #{node => A, timeout => 2000, verbose => true}, - Connection = open_amqp10_connection(_Config), + [Connection,Sender] = open_amqp10_connection(_Config), - [{tracked_connection, _, _, _, _, _, {'AMQP',"1.0"}, _, _, _, _, _}] = + TrackedConnections = rabbit_ct_broker_helpers:rpc(_Config, A, rabbit_connection_tracking, list, []), + println("tracked connections", TrackedConnections), - [P1, P2] = [P - || {_, P, _, [tcp_listener_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor,which_children, [rabbit_sup])], - println("Pid1", P1), - println("Pid2", P2), - - Children1 = rabbit_ct_broker_helpers:rpc(_Config, A, supervisor,which_children,[P1]), - println("tcp_listener_sup1", Children1), - Children2 = rabbit_ct_broker_helpers:rpc(_Config, A, supervisor,which_children,[P2]), - println("tcp_listener_sup2", Children2), - - [Res1, Res2] = [RanchSupPid - || {_, TcpPid, _, [tcp_listener_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [rabbit_sup]), - {_, RanchSupPid, _, [ranch_embedded_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [TcpPid]) - ], - println("ranch_embedded_sup 1", rabbit_ct_broker_helpers:rpc(_Config, A, supervisor,which_children,[Res1])), - println("ranch_embedded_sup 2", rabbit_ct_broker_helpers:rpc(_Config, A, supervisor,which_children,[Res2])), - - [Rl1, Rl2] = [RanchLPid - || {_, TcpPid, _, [tcp_listener_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [rabbit_sup]), - {_, RanchEPid, _, [ranch_embedded_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [TcpPid]), - {_, RanchLPid, _, [ranch_listener_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [RanchEPid]) - ], - println("ranch_listener_sup", [Rl1, Rl2]), - println("ranch_listener_sup 1", rabbit_ct_broker_helpers:rpc(_Config, A, supervisor,which_children,[Rl1])), - println("ranch_listener_sup 2", rabbit_ct_broker_helpers:rpc(_Config, A, supervisor,which_children,[Rl2])), - - ReaderPids = [ReaderPid - || {_, TcpPid, _, [tcp_listener_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [rabbit_sup]), - {_, RanchEPid, _, [ranch_embedded_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [TcpPid]), - {_, RanchLPid, _, [ranch_listener_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [RanchEPid]), - {_, RanchSPid, _, [ranch_conns_sup_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [RanchLPid]), - {_, RanchCPid, _, [ranch_conns_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [RanchSPid]), - {rabbit_connection_sup, ConnPid, _, _} <- supervisor:which_children(RanchCPid), - {reader, ReaderPid, _, _} <- supervisor:which_children(ConnPid) - ], - println("ReaderPid", ReaderPids), - ReaderInfos = [rabbit_ct_broker_helpers:rpc(_Config, A, rabbit_amqp1_0_reader, info, [Pid, [node,user]]) || Pid <- ReaderPids], - println("ReaderPid info",ReaderInfos), - - [ExpectedConnection] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)), - close_amqp10_connection(Connection). + println("list_amqp10", 'Elixir.Enum':to_list(?COMMAND:run([], Opts))), + close_amqp10_connection(Connection, Sender). when_one_amqp091_connection(_Config) -> [A] = rabbit_ct_broker_helpers:get_node_configs(_Config, nodename), @@ -169,17 +131,72 @@ open_amqp091_client_connection(_Config) -> close_amqp091_client_connection(Connection) -> ?assertEqual(ok, amqp_connection:close(Connection)). -open_amqp10_connection(_Config) -> - Host = ?config(rmq_hostname, _Config), - Port = rabbit_ct_broker_helpers:get_node_config(_Config, 0, tcp_port_amqp), - % create a configuration map - OpnConf = #{address => Host, - port => Port, - container_id => atom_to_binary(?FUNCTION_NAME, utf8), - sasl => {plain, <<"guest">>, <<"guest">>}}, - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, _} = amqp10_client:begin_session(Connection), - Connection. - -close_amqp10_connection(Connection) -> - ok = amqp10_client:close_connection(Connection). +open_amqp10_connection(Config) -> + Host = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + QName = atom_to_binary(?FUNCTION_NAME, utf8), + Address = <<"/amq/queue/", QName/binary>>, + %% declare a quorum queue + Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}), + + % create a configuration map + OpnConf = #{address => Host, + port => Port, + container_id => atom_to_binary(?FUNCTION_NAME, utf8), + sasl => {plain, <<"guest">>, <<"guest">>}}, + + % ct:pal("opening connectoin with ~p", [OpnConf]), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session(Connection), + SenderLinkName = <<"test-sender">>, + {ok, Sender} = amqp10_client:attach_sender_link(Session, + SenderLinkName, + Address), + + % wait for credit to be received + receive + {amqp10_event, {link, Sender, credited}} -> ok + after 2000 -> + exit(credited_timeout) + end, + + OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true), + ok = amqp10_client:send_msg(Sender, OutMsg), + + flush("pre-receive"), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, + <<"test-receiver">>, + Address), + + % grant credit and drain + ok = amqp10_client:flow_link_credit(Receiver, 1, never, true), + + % wait for a delivery + receive + {amqp10_msg, Receiver, _InMsg} -> println("Received amqp 1.0 message",_InMsg ), ok + after 2000 -> + exit(delivery_timeout) + end, + + + + [Connection, Sender]. + +flush(Prefix) -> + receive + Msg -> + ct:pal("~s flushed: ~w~n", [Prefix, Msg]), + flush(Prefix) + after 1 -> + ok + end. + +close_amqp10_connection(Connection, Sender) -> + flush("final"), + println("Closing connection", Connection), + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:close_connection(Connection), + ok. diff --git a/deps/rabbitmq_amqp1_0/test/system_SUITE_data/console/Program.cs b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/console/Program.cs new file mode 100644 index 000000000000..a4c69b3d7c3a --- /dev/null +++ b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/console/Program.cs @@ -0,0 +1,44 @@ +using Amqp; +using Amqp.Framing; + +// See https://aka.ms/new-console-template for more information +Console.WriteLine("Sending 10 messages to amqp10-queue. Press a button to terminate"); + +// Create the AMQP connection string +var connectionString = $"amqp://guest:guest@localhost:5672/%2F"; + +// Create the AMQP connection +var connection = new Connection(new Address(connectionString)); + +// Create the AMQP session +var amqpSession = new Session(connection); + + // Give a name to the sender +var senderSubscriptionId = "rabbitmq.amqp.sender"; + +// Name of the topic you will be sending messages (Name of the Queue) +var topic = "amqp10-queue"; + +// Create the AMQP sender +var sender = new SenderLink(amqpSession, senderSubscriptionId, topic); + +for (var i = 0; i < 10; i++) +{ + // Create message + var message = new Message($"Received message {i}"); + + // Add a meesage id + message.Properties = new Properties() { MessageId = Guid.NewGuid().ToString() }; + + // Add some message properties + message.ApplicationProperties = new ApplicationProperties(); + message.ApplicationProperties["Message.Type.FullName"] = typeof(string).FullName; + + // Send message + sender.Send(message); + + Task.Delay(2000).Wait(); +} + + // Wait for a key to close the program +Console.Read(); diff --git a/deps/rabbitmq_amqp1_0/test/system_SUITE_data/console/README.md b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/console/README.md new file mode 100644 index 000000000000..ae3164c970c9 --- /dev/null +++ b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/console/README.md @@ -0,0 +1,11 @@ +# Console application to test AMQP 1.0 + +This is a basic .NetCore console application which uses *AMQP 1.0* .Net **AMQPNetLite** client library. + +Usage: +``` +dotnet run +``` + +It sends 10 messages and it waits for key-stroke to terminate which is very convenient +when we need to create AMQP 1.0 connections. diff --git a/deps/rabbitmq_amqp1_0/test/system_SUITE_data/console/standalone.csproj b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/console/standalone.csproj new file mode 100644 index 000000000000..2341b149952f --- /dev/null +++ b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/console/standalone.csproj @@ -0,0 +1,13 @@ + + + + Exe + net6.0 + enable + enable + + + + + +