Skip to content

Commit

Permalink
Migrate from supervisor2 to supervisor
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk authored and lhoguin committed Sep 27, 2022
1 parent c4bc43b commit 2855278
Show file tree
Hide file tree
Showing 38 changed files with 800 additions and 426 deletions.
52 changes: 35 additions & 17 deletions deps/amqp_client/src/amqp_channel_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

-include("amqp_client_internal.hrl").

-behaviour(supervisor2).
-behaviour(supervisor).

-export([start_link/6]).
-export([init/1]).
Expand All @@ -22,13 +22,17 @@
start_link(Type, Connection, ConnName, InfraArgs, ChNumber,
Consumer = {_, _}) ->
Identity = {ConnName, ChNumber},
{ok, Sup} = supervisor2:start_link(?MODULE, [Consumer, Identity]),
[{gen_consumer, ConsumerPid, _, _}] = supervisor2:which_children(Sup),
{ok, ChPid} = supervisor2:start_child(
Sup, {channel,
{amqp_channel, start_link,
[Type, Connection, ChNumber, ConsumerPid, Identity]},
intrinsic, ?WORKER_WAIT, worker, [amqp_channel]}),
{ok, Sup} = supervisor:start_link(?MODULE, [Consumer, Identity]),
[{gen_consumer, ConsumerPid, _, _}] = supervisor:which_children(Sup),
StartMFA = {amqp_channel, start_link, [Type, Connection, ChNumber, ConsumerPid, Identity]},
ChildSpec = #{id => channel,
start => StartMFA,
restart => transient,
significant => true,
shutdown => ?WORKER_WAIT,
type => worker,
modules => [amqp_channel]},
{ok, ChPid} = supervisor:start_child(Sup, ChildSpec),
case start_writer(Sup, Type, InfraArgs, ConnName, ChNumber, ChPid) of
{ok, Writer} ->
amqp_channel:set_writer(ChPid, Writer),
Expand Down Expand Up @@ -59,22 +63,36 @@ start_writer(_Sup, direct, [ConnPid, Node, User, VHost, Collector, AmqpParams],
end;
start_writer(Sup, network, [Sock, FrameMax], ConnName, ChNumber, ChPid) ->
GCThreshold = application:get_env(amqp_client, writer_gc_threshold, ?DEFAULT_GC_THRESHOLD),
supervisor2:start_child(
Sup,
{writer, {rabbit_writer, start_link,
StartMFA = {rabbit_writer, start_link,
[Sock, ChNumber, FrameMax, ?PROTOCOL, ChPid,
{ConnName, ChNumber}, false, GCThreshold]},
transient, ?WORKER_WAIT, worker, [rabbit_writer]}).
ChildSpec = #{id => writer,
start => StartMFA,
restart => transient,
significant => true,
shutdown => ?WORKER_WAIT,
type => worker,
modules => [rabbit_writer]},
supervisor:start_child(Sup, ChildSpec).

init_command_assembler(direct) -> {ok, none};
init_command_assembler(network) -> rabbit_command_assembler:init(?PROTOCOL).

%%---------------------------------------------------------------------------
%% supervisor2 callbacks
%% supervisor callbacks
%%---------------------------------------------------------------------------

init([{ConsumerModule, ConsumerArgs}, Identity]) ->
{ok, {{one_for_all, 0, 1},
[{gen_consumer, {amqp_gen_consumer, start_link,
[ConsumerModule, ConsumerArgs, Identity]},
intrinsic, ?WORKER_WAIT, worker, [amqp_gen_consumer]}]}}.
SupFlags = #{strategy => one_for_all,
intensity => 0,
period => 1,
auto_shutdown => any_significant},
ChildStartMFA = {amqp_gen_consumer, start_link, [ConsumerModule, ConsumerArgs, Identity]},
ChildSpec = #{id => gen_consumer,
start => ChildStartMFA,
restart => transient,
significant => true,
shutdown => ?WORKER_WAIT,
type => worker,
modules => [amqp_gen_consumer]},
{ok, {SupFlags, [ChildSpec]}}.
21 changes: 13 additions & 8 deletions deps/amqp_client/src/amqp_channel_sup_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

-include("amqp_client.hrl").

-behaviour(supervisor2).
-behaviour(supervisor).

-export([start_link/3, start_channel_sup/4]).
-export([init/1]).
Expand All @@ -20,17 +20,22 @@
%%---------------------------------------------------------------------------

start_link(Type, Connection, ConnName) ->
supervisor2:start_link(?MODULE, [Type, Connection, ConnName]).
supervisor:start_link(?MODULE, [Type, Connection, ConnName]).

start_channel_sup(Sup, InfraArgs, ChannelNumber, Consumer) ->
supervisor2:start_child(Sup, [InfraArgs, ChannelNumber, Consumer]).
supervisor:start_child(Sup, [InfraArgs, ChannelNumber, Consumer]).

%%---------------------------------------------------------------------------
%% supervisor2 callbacks
%% supervisor callbacks
%%---------------------------------------------------------------------------

init([Type, Connection, ConnName]) ->
{ok, {{simple_one_for_one, 0, 1},
[{channel_sup,
{amqp_channel_sup, start_link, [Type, Connection, ConnName]},
temporary, infinity, supervisor, [amqp_channel_sup]}]}}.
SupFlags = #{strategy => simple_one_for_one, intensity => 0, period => 1},
ChildStartMFA = {amqp_channel_sup, start_link, [Type, Connection, ConnName]},
ChildSpec = #{id => channel_sup,
start => ChildStartMFA,
restart => temporary,
shutdown => infinity,
type => supervisor,
modules => [amqp_channel_sup]},
{ok, {SupFlags, [ChildSpec]}}.
42 changes: 28 additions & 14 deletions deps/amqp_client/src/amqp_connection_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

-include("amqp_client.hrl").

-behaviour(supervisor2).
-behaviour(supervisor).

-export([start_link/1]).
-export([init/1]).
Expand All @@ -20,22 +20,36 @@
%%---------------------------------------------------------------------------

start_link(AMQPParams) ->
{ok, Sup} = supervisor2:start_link(?MODULE, []),
{ok, TypeSup} = supervisor2:start_child(
Sup, {connection_type_sup,
{amqp_connection_type_sup, start_link, []},
transient, ?SUPERVISOR_WAIT, supervisor,
[amqp_connection_type_sup]}),
{ok, Connection} = supervisor2:start_child(
Sup, {connection, {amqp_gen_connection, start_link,
[TypeSup, AMQPParams]},
intrinsic, brutal_kill, worker,
[amqp_gen_connection]}),
{ok, Sup} = supervisor:start_link(?MODULE, []),

StartMFA0 = {amqp_connection_type_sup, start_link, []},
ChildSpec0 = #{id => connection_type_sup,
start => StartMFA0,
restart => transient,
shutdown => ?SUPERVISOR_WAIT,
type => supervisor,
modules => [amqp_connection_type_sup]},
{ok, TypeSup} = supervisor:start_child(Sup, ChildSpec0),

StartMFA1 = {amqp_gen_connection, start_link, [TypeSup, AMQPParams]},
ChildSpec1 = #{id => connection,
start => StartMFA1,
restart => transient,
significant => true,
shutdown => brutal_kill,
type => worker,
modules => [amqp_gen_connection]},
{ok, Connection} = supervisor:start_child(Sup, ChildSpec1),

{ok, Sup, Connection}.

%%---------------------------------------------------------------------------
%% supervisor2 callbacks
%% supervisor callbacks
%%---------------------------------------------------------------------------

init([]) ->
{ok, {{one_for_all, 0, 1}, []}}.
SupFlags = #{strategy => one_for_all,
intensity => 0,
period => 1,
auto_shutdown => any_significant},
{ok, {SupFlags, []}}.
105 changes: 63 additions & 42 deletions deps/amqp_client/src/amqp_connection_type_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

-include("amqp_client_internal.hrl").

-behaviour(supervisor2).
-behaviour(supervisor).

-export([start_link/0, start_infrastructure_fun/3, type_module/1]).

Expand All @@ -21,71 +21,92 @@
%%---------------------------------------------------------------------------

start_link() ->
supervisor2:start_link(?MODULE, []).
supervisor:start_link(?MODULE, []).

type_module(#amqp_params_direct{}) -> {direct, amqp_direct_connection};
type_module(#amqp_params_network{}) -> {network, amqp_network_connection}.

%%---------------------------------------------------------------------------

start_channels_manager(Sup, Conn, ConnName, Type) ->
{ok, ChSupSup} = supervisor2:start_child(
Sup,
{channel_sup_sup, {amqp_channel_sup_sup, start_link,
[Type, Conn, ConnName]},
intrinsic, ?SUPERVISOR_WAIT, supervisor,
[amqp_channel_sup_sup]}),
{ok, _} = supervisor2:start_child(
Sup,
{channels_manager, {amqp_channels_manager, start_link,
[Conn, ConnName, ChSupSup]},
transient, ?WORKER_WAIT, worker, [amqp_channels_manager]}).
StartMFA0 = {amqp_channel_sup_sup, start_link, [Type, Conn, ConnName]},
ChildSpec0 = #{id => channel_sup_sup,
start => StartMFA0,
restart => transient,
significant => true,
shutdown => ?SUPERVISOR_WAIT,
type => supervisor,
modules => [amqp_channel_sup_sup]},
{ok, ChSupSup} = supervisor:start_child(Sup, ChildSpec0),

StartMFA1 = {amqp_channels_manager, start_link, [Conn, ConnName, ChSupSup]},
ChildSpec1 = #{id => channels_manager,
start => StartMFA1,
restart => transient,
shutdown => ?WORKER_WAIT,
type => worker,
modules => [amqp_channels_manager]},
{ok, _} = supervisor:start_child(Sup, ChildSpec1).

start_infrastructure_fun(Sup, Conn, network) ->
fun (Sock, ConnName) ->
{ok, ChMgr} = start_channels_manager(Sup, Conn, ConnName, network),
{ok, AState} = rabbit_command_assembler:init(?PROTOCOL),
{ok, GCThreshold} = application:get_env(amqp_client, writer_gc_threshold),
{ok, Writer} =
supervisor2:start_child(
Sup,
{writer,
{rabbit_writer, start_link,
[Sock, 0, ?FRAME_MIN_SIZE, ?PROTOCOL, Conn, ConnName,
false, GCThreshold]},
transient, ?WORKER_WAIT, worker, [rabbit_writer]}),
{ok, Reader} =
supervisor2:start_child(
Sup,
{main_reader, {amqp_main_reader, start_link,
[Sock, Conn, ChMgr, AState, ConnName]},
transient, ?WORKER_WAIT, worker, [amqp_main_reader]}),

WriterStartMFA = {rabbit_writer, start_link,
[Sock, 0, ?FRAME_MIN_SIZE, ?PROTOCOL, Conn,
ConnName, false, GCThreshold]},
WriterChildSpec = #{id => writer,
start => WriterStartMFA,
restart => transient,
shutdown => ?WORKER_WAIT,
type => worker,
modules => [rabbit_writer]},
{ok, Writer} = supervisor:start_child(Sup, WriterChildSpec),

ReaderStartMFA = {amqp_main_reader, start_link,
[Sock, Conn, ChMgr, AState, ConnName]},
ReaderChildSpec = #{id => main_reader,
start => ReaderStartMFA,
restart => transient,
shutdown => ?WORKER_WAIT,
type => worker,
modules => [amqp_main_reader]},
{ok, Reader} = supervisor:start_child(Sup, ReaderChildSpec),
case rabbit_net:controlling_process(Sock, Reader) of
ok ->
case amqp_main_reader:post_init(Reader) of
ok ->
{ok, ChMgr, Writer};
{error, Reason} ->
ok ->
case amqp_main_reader:post_init(Reader) of
ok ->
{ok, ChMgr, Writer};
{error, Reason} ->
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end
end;
start_infrastructure_fun(Sup, Conn, direct) ->
fun (ConnName) ->
{ok, ChMgr} = start_channels_manager(Sup, Conn, ConnName, direct),
{ok, Collector} =
supervisor2:start_child(
Sup,
{collector, {rabbit_queue_collector, start_link, [ConnName]},
transient, ?WORKER_WAIT, worker, [rabbit_queue_collector]}),
StartMFA = {rabbit_queue_collector, start_link, [ConnName]},
ChildSpec = #{id => collector,
start => StartMFA,
restart => transient,
shutdown => ?WORKER_WAIT,
type => worker,
modules => [rabbit_queue_collector]},
{ok, Collector} = supervisor:start_child(Sup, ChildSpec),
{ok, ChMgr, Collector}
end.

%%---------------------------------------------------------------------------
%% supervisor2 callbacks
%% supervisor callbacks
%%---------------------------------------------------------------------------

init([]) ->
{ok, {{one_for_all, 0, 1}, []}}.
SupFlags = #{strategy => one_for_all,
intensity => 0,
period => 1,
auto_shutdown => any_significant},
{ok, {SupFlags, []}}.
21 changes: 14 additions & 7 deletions deps/amqp_client/src/amqp_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

-include("amqp_client.hrl").

-behaviour(supervisor2).
-behaviour(supervisor).

-export([start_link/0, is_ready/0, start_connection_sup/1]).
-export([init/1]).
Expand All @@ -20,19 +20,26 @@
%%---------------------------------------------------------------------------

start_link() ->
supervisor2:start_link({local, amqp_sup}, ?MODULE, []).
supervisor:start_link({local, amqp_sup}, ?MODULE, []).

is_ready() ->
whereis(amqp_sup) =/= undefined.

start_connection_sup(AmqpParams) ->
supervisor2:start_child(amqp_sup, [AmqpParams]).
supervisor:start_child(amqp_sup, [AmqpParams]).

%%---------------------------------------------------------------------------
%% supervisor2 callbacks
%% supervisor callbacks
%%---------------------------------------------------------------------------

init([]) ->
{ok, {{simple_one_for_one, 0, 1},
[{connection_sup, {amqp_connection_sup, start_link, []},
temporary, ?SUPERVISOR_WAIT, supervisor, [amqp_connection_sup]}]}}.
SupFlags = #{strategy => simple_one_for_one,
intensity => 0,
period => 1},
ChildSpec = #{id => connection_sup,
start => {amqp_connection_sup, start_link, []},
restart => temporary,
shutdown => ?SUPERVISOR_WAIT,
type => supervisor,
modules => [amqp_connection_sup]},
{ok, {SupFlags, [ChildSpec]}}.
Loading

0 comments on commit 2855278

Please sign in to comment.