Skip to content

Commit

Permalink
Fix issue listing amqp 1.0 connections
Browse files Browse the repository at this point in the history
The issue was related to the supervisor
hierarchy we used to search for the reader
process
  • Loading branch information
MarcialRosales authored and lukebakken committed Sep 27, 2022
1 parent f48e013 commit abdee76
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 61 deletions.
11 changes: 7 additions & 4 deletions deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,9 @@ info(Pid, InfoItems) ->
UnknownItems -> throw({bad_argument, UnknownItems})
end.

info_internal(pid, #v1{}) -> self();
info_internal(connection, #v1{connection = Val}) ->
Val;
info_internal(node, #v1{}) -> node();
info_internal(auth_mechanism, #v1{connection = #v1_connection{auth_mechanism = none}}) ->
none;
Expand All @@ -764,11 +767,11 @@ info_internal(frame_max, #v1{connection = #v1_connection{frame_max = Val}}) ->
info_internal(timeout, #v1{connection = #v1_connection{timeout_sec = Val}}) ->
Val;
info_internal(user,
#v1{connection = #v1_connection{user = #user{username = none}}}) ->
'';
info_internal(username,
#v1{connection = #v1_connection{user = #user{username = Val}}}) ->
#v1{connection = #v1_connection{user = {user, Val, _, _}}}) ->
Val;
info_internal(user,
#v1{connection = #v1_connection{user = none}}) ->
'';
info_internal(state, #v1{connection_state = Val}) ->
Val;
info_internal(SockStat, S) when SockStat =:= recv_oct;
Expand Down
131 changes: 74 additions & 57 deletions deps/rabbitmq_amqp1_0/test/command_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,53 +90,15 @@ when_one_connection(_Config) ->
[A] = rabbit_ct_broker_helpers:get_node_configs(_Config, nodename),
Opts = #{node => A, timeout => 2000, verbose => true},

Connection = open_amqp10_connection(_Config),
[Connection,Sender] = open_amqp10_connection(_Config),

[{tracked_connection, _, _, _, _, _, {'AMQP',"1.0"}, _, _, _, _, _}] =
TrackedConnections =
rabbit_ct_broker_helpers:rpc(_Config, A,
rabbit_connection_tracking, list, []),
println("tracked connections", TrackedConnections),

[P1, P2] = [P
|| {_, P, _, [tcp_listener_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor,which_children, [rabbit_sup])],
println("Pid1", P1),
println("Pid2", P2),

Children1 = rabbit_ct_broker_helpers:rpc(_Config, A, supervisor,which_children,[P1]),
println("tcp_listener_sup1", Children1),
Children2 = rabbit_ct_broker_helpers:rpc(_Config, A, supervisor,which_children,[P2]),
println("tcp_listener_sup2", Children2),

[Res1, Res2] = [RanchSupPid
|| {_, TcpPid, _, [tcp_listener_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [rabbit_sup]),
{_, RanchSupPid, _, [ranch_embedded_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [TcpPid])
],
println("ranch_embedded_sup 1", rabbit_ct_broker_helpers:rpc(_Config, A, supervisor,which_children,[Res1])),
println("ranch_embedded_sup 2", rabbit_ct_broker_helpers:rpc(_Config, A, supervisor,which_children,[Res2])),

[Rl1, Rl2] = [RanchLPid
|| {_, TcpPid, _, [tcp_listener_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [rabbit_sup]),
{_, RanchEPid, _, [ranch_embedded_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [TcpPid]),
{_, RanchLPid, _, [ranch_listener_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [RanchEPid])
],
println("ranch_listener_sup", [Rl1, Rl2]),
println("ranch_listener_sup 1", rabbit_ct_broker_helpers:rpc(_Config, A, supervisor,which_children,[Rl1])),
println("ranch_listener_sup 2", rabbit_ct_broker_helpers:rpc(_Config, A, supervisor,which_children,[Rl2])),

ReaderPids = [ReaderPid
|| {_, TcpPid, _, [tcp_listener_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [rabbit_sup]),
{_, RanchEPid, _, [ranch_embedded_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [TcpPid]),
{_, RanchLPid, _, [ranch_listener_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [RanchEPid]),
{_, RanchSPid, _, [ranch_conns_sup_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [RanchLPid]),
{_, RanchCPid, _, [ranch_conns_sup]} <- rabbit_ct_broker_helpers:rpc(_Config, A, supervisor, which_children, [RanchSPid]),
{rabbit_connection_sup, ConnPid, _, _} <- supervisor:which_children(RanchCPid),
{reader, ReaderPid, _, _} <- supervisor:which_children(ConnPid)
],
println("ReaderPid", ReaderPids),
ReaderInfos = [rabbit_ct_broker_helpers:rpc(_Config, A, rabbit_amqp1_0_reader, info, [Pid, [node,user]]) || Pid <- ReaderPids],
println("ReaderPid info",ReaderInfos),

[ExpectedConnection] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)),
close_amqp10_connection(Connection).
println("list_amqp10", 'Elixir.Enum':to_list(?COMMAND:run([], Opts))),
close_amqp10_connection(Connection, Sender).

when_one_amqp091_connection(_Config) ->
[A] = rabbit_ct_broker_helpers:get_node_configs(_Config, nodename),
Expand Down Expand Up @@ -169,17 +131,72 @@ open_amqp091_client_connection(_Config) ->
close_amqp091_client_connection(Connection) ->
?assertEqual(ok, amqp_connection:close(Connection)).

open_amqp10_connection(_Config) ->
Host = ?config(rmq_hostname, _Config),
Port = rabbit_ct_broker_helpers:get_node_config(_Config, 0, tcp_port_amqp),
% create a configuration map
OpnConf = #{address => Host,
port => Port,
container_id => atom_to_binary(?FUNCTION_NAME, utf8),
sasl => {plain, <<"guest">>, <<"guest">>}},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, _} = amqp10_client:begin_session(Connection),
Connection.

close_amqp10_connection(Connection) ->
ok = amqp10_client:close_connection(Connection).
open_amqp10_connection(Config) ->
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
QName = atom_to_binary(?FUNCTION_NAME, utf8),
Address = <<"/amq/queue/", QName/binary>>,
%% declare a quorum queue
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),

% create a configuration map
OpnConf = #{address => Host,
port => Port,
container_id => atom_to_binary(?FUNCTION_NAME, utf8),
sasl => {plain, <<"guest">>, <<"guest">>}},

% ct:pal("opening connectoin with ~p", [OpnConf]),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
SenderLinkName = <<"test-sender">>,
{ok, Sender} = amqp10_client:attach_sender_link(Session,
SenderLinkName,
Address),

% wait for credit to be received
receive
{amqp10_event, {link, Sender, credited}} -> ok
after 2000 ->
exit(credited_timeout)
end,

OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true),
ok = amqp10_client:send_msg(Sender, OutMsg),

flush("pre-receive"),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session,
<<"test-receiver">>,
Address),

% grant credit and drain
ok = amqp10_client:flow_link_credit(Receiver, 1, never, true),

% wait for a delivery
receive
{amqp10_msg, Receiver, _InMsg} -> println("Received amqp 1.0 message",_InMsg ), ok
after 2000 ->
exit(delivery_timeout)
end,



[Connection, Sender].

flush(Prefix) ->
receive
Msg ->
ct:pal("~s flushed: ~w~n", [Prefix, Msg]),
flush(Prefix)
after 1 ->
ok
end.

close_amqp10_connection(Connection, Sender) ->
flush("final"),
println("Closing connection", Connection),
ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:close_connection(Connection),
ok.
44 changes: 44 additions & 0 deletions deps/rabbitmq_amqp1_0/test/system_SUITE_data/console/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using Amqp;
using Amqp.Framing;

// See https://aka.ms/new-console-template for more information
Console.WriteLine("Sending 10 messages to amqp10-queue. Press a button to terminate");

// Create the AMQP connection string
var connectionString = $"amqp://guest:guest@localhost:5672/%2F";

// Create the AMQP connection
var connection = new Connection(new Address(connectionString));

// Create the AMQP session
var amqpSession = new Session(connection);

// Give a name to the sender
var senderSubscriptionId = "rabbitmq.amqp.sender";

// Name of the topic you will be sending messages (Name of the Queue)
var topic = "amqp10-queue";

// Create the AMQP sender
var sender = new SenderLink(amqpSession, senderSubscriptionId, topic);

for (var i = 0; i < 10; i++)
{
// Create message
var message = new Message($"Received message {i}");

// Add a meesage id
message.Properties = new Properties() { MessageId = Guid.NewGuid().ToString() };

// Add some message properties
message.ApplicationProperties = new ApplicationProperties();
message.ApplicationProperties["Message.Type.FullName"] = typeof(string).FullName;

// Send message
sender.Send(message);

Task.Delay(2000).Wait();
}

// Wait for a key to close the program
Console.Read();
11 changes: 11 additions & 0 deletions deps/rabbitmq_amqp1_0/test/system_SUITE_data/console/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Console application to test AMQP 1.0

This is a basic .NetCore console application which uses *AMQP 1.0* .Net **AMQPNetLite** client library.

Usage:
```
dotnet run
```

It sends 10 messages and it waits for key-stroke to terminate which is very convenient
when we need to create AMQP 1.0 connections.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AMQPNetLite" Version="2.4.5" />
</ItemGroup>

</Project>

0 comments on commit abdee76

Please sign in to comment.