Skip to content

Commit

Permalink
queue_SUITE: use a different upstream for each queue on multi-federat…
Browse files Browse the repository at this point in the history
…ion tests
  • Loading branch information
dcorbacho committed Oct 29, 2024
1 parent 385b7e4 commit 583d13c
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 16 deletions.
28 changes: 15 additions & 13 deletions deps/rabbitmq_federation/test/queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ multiple_downstreams(Config) ->
fun (Ch) ->
timer:sleep(?INITIAL_WAIT),
expect_federation(Config, Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Config, Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT)
end, upstream_downstream(Config) ++ [q(<<"fed.downstream2">>, Args)]).
expect_federation(Config, Ch, <<"upstream2">>, <<"fed2.downstream">>, ?EXPECT_FEDERATION_TIMEOUT)
end, upstream_downstream(Config) ++ [q(<<"fed2.downstream">>, Args)]).

message_flow(Config) ->
%% TODO: specifc source / target here
Expand Down Expand Up @@ -258,36 +258,37 @@ federate_unfederate(Config) ->
fun (Ch) ->
timer:sleep(?INITIAL_WAIT),
expect_federation(Config, Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Config, Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Config, Ch, <<"upstream2">>, <<"fed2.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),

%% clear the policy
rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"fed">>),

expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream">>),
expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream2">>),
expect_no_federation(Ch, <<"upstream2">>, <<"fed2.downstream">>),

rabbit_ct_broker_helpers:set_policy(Config, 0,
<<"fed">>, <<"^fed\.">>, <<"all">>, [
{<<"federation-upstream-set">>, <<"upstream">>}])
end, upstream_downstream(Config) ++ [q(<<"fed.downstream2">>, Args)]).
end, upstream_downstream(Config) ++ [q(<<"fed2.downstream">>, Args)]).

dynamic_plugin_stop_start(Config) ->
DownQ2 = <<"fed.downstream2">>,
DownQ2 = <<"fed2.downstream">>,
Args = ?config(target_queue_args, Config),
with_ch(Config,
fun (Ch) ->
timer:sleep(?INITIAL_WAIT),
UpQ = <<"upstream">>,
UpQ1 = <<"upstream">>,
UpQ2 = <<"upstream2">>,
DownQ1 = <<"fed.downstream">>,
expect_federation(Config, Ch, UpQ, DownQ1, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Config, Ch, UpQ, DownQ2, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Config, Ch, UpQ1, DownQ1, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Config, Ch, UpQ2, DownQ2, ?EXPECT_FEDERATION_TIMEOUT),

%% Disable the plugin, the link disappears
ct:pal("Stopping rabbitmq_federation"),
ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, "rabbitmq_federation"),

expect_no_federation(Ch, UpQ, DownQ1),
expect_no_federation(Ch, UpQ, DownQ2),
expect_no_federation(Ch, UpQ1, DownQ1),
expect_no_federation(Ch, UpQ2, DownQ2),

maybe_declare_queue(Config, Ch, q(DownQ1, Args)),
maybe_declare_queue(Config, Ch, q(DownQ2, Args)),
Expand All @@ -305,12 +306,13 @@ dynamic_plugin_stop_start(Config) ->
Entry || Entry <- Status,
proplists:get_value(queue, Entry) =:= DownQ1 orelse
proplists:get_value(queue, Entry) =:= DownQ2,
proplists:get_value(upstream_queue, Entry) =:= UpQ,
proplists:get_value(upstream_queue, Entry) =:= UpQ1 orelse
proplists:get_value(upstream_queue, Entry) =:= UpQ2,
proplists:get_value(status, Entry) =:= running
],
length(L) =:= 2
end),
expect_federation(Config, Ch, UpQ, DownQ1, 120000)
expect_federation(Config, Ch, UpQ1, DownQ1, 120000)
end, upstream_downstream(Config) ++ [q(DownQ2, Args)]).

restart_upstream(Config) ->
Expand Down
11 changes: 8 additions & 3 deletions deps/rabbitmq_federation/test/rabbit_federation_test_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ setup_federation_with_upstream_params(Config, ExtraParams) ->

rabbit_ct_broker_helpers:rpc(
Config, 0, rabbit_policy, set,
[<<"/">>, <<"fed">>, <<"^fed\.">>, [{<<"federation-upstream-set">>, <<"upstream">>}],
[<<"/">>, <<"fed">>, <<"^fed\\.">>, [{<<"federation-upstream-set">>, <<"upstream">>}],
0, <<"all">>, <<"acting-user">>]),

rabbit_ct_broker_helpers:rpc(
Config, 0, rabbit_policy, set,
[<<"/">>, <<"fed2">>, <<"^fed2\\.">>, [{<<"federation-upstream-set">>, <<"upstream2">>}],
0, <<"all">>, <<"acting-user">>]),

rabbit_ct_broker_helpers:rpc(
Expand Down Expand Up @@ -144,10 +149,10 @@ setup_down_federation(Config) ->
{<<"queue">>, <<"upstream">>}]]),
rabbit_ct_broker_helpers:set_policy(
Config, 0,
<<"fed">>, <<"^fed\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]),
<<"fed">>, <<"^fed\\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]),
rabbit_ct_broker_helpers:set_policy(
Config, 0,
<<"fed">>, <<"^fed\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]),
<<"fed">>, <<"^fed\\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]),
Config.

wait_for_federation(Retries, Fun) ->
Expand Down

0 comments on commit 583d13c

Please sign in to comment.