diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index 503b26d5d292..7facfe67cf71 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -210,9 +210,18 @@ handle_http_req(<<"PUT">>, {error, not_found} -> ok = prohibit_cr_lf(XNameBin), ok = prohibit_reserved_amq(XName), - rabbit_exchange:declare( - XName, XTypeAtom, Durable, AutoDelete, - Internal, XArgs, Username) + case rabbit_exchange:declare( + XName, XTypeAtom, Durable, AutoDelete, + Internal, XArgs, Username) of + {ok, DeclaredX} -> + DeclaredX; + {error, timeout} -> + throw( + <<"503">>, + "Could not create ~ts because the operation " + "timed out", + [rabbit_misc:rs(XName)]) + end end, try rabbit_exchange:assert_equivalence( X, XTypeAtom, Durable, AutoDelete, Internal, XArgs) of @@ -285,8 +294,15 @@ handle_http_req(<<"DELETE">>, ok = prohibit_default_exchange(XName), ok = prohibit_reserved_amq(XName), PermCache = check_resource_access(XName, configure, User, PermCache0), - _ = rabbit_exchange:delete(XName, false, Username), - {<<"204">>, null, {PermCache, TopicPermCache}}; + case rabbit_exchange:ensure_deleted(XName, false, Username) of + ok -> + {<<"204">>, null, {PermCache, TopicPermCache}}; + {error, timeout} -> + throw( + <<"503">>, + "failed to delete ~ts due to a timeout", + [rabbit_misc:rs(XName)]) + end; handle_http_req(<<"POST">>, [<<"bindings">>], diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index cdd34b16092c..908892781574 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -2512,13 +2512,16 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, check_not_default_exchange(ExchangeName), check_exchange_deletion(ExchangeName), check_configure_permitted(ExchangeName, User, AuthzContext), - case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of - {error, not_found} -> + case rabbit_exchange:ensure_deleted(ExchangeName, IfUnused, Username) of + ok -> ok; {error, in_use} -> rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(ExchangeName)]); - ok -> - ok + {error, timeout} -> + rabbit_misc:protocol_error( + internal_error, + "failed to delete ~ts due to a timeout", + [rabbit_misc:rs(ExchangeName)]) end; handle_method(#'queue.purge'{queue = QueueNameBin}, ConnPid, AuthzContext, _CollectorPid, VHostPath, User) -> @@ -2566,13 +2569,22 @@ handle_method(#'exchange.declare'{exchange = XNameBin, check_write_permitted(AName, User, AuthzContext), ok end, - rabbit_exchange:declare(ExchangeName, - CheckedType, - Durable, - AutoDelete, - Internal, - Args, - Username) + case rabbit_exchange:declare(ExchangeName, + CheckedType, + Durable, + AutoDelete, + Internal, + Args, + Username) of + {ok, DeclaredX} -> + DeclaredX; + {error, timeout} -> + rabbit_misc:protocol_error( + internal_error, + "failed to declare ~ts because the operation " + "timed out", + [rabbit_misc:rs(ExchangeName)]) + end end, ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable, AutoDelete, Internal, Args); diff --git a/deps/rabbit/src/rabbit_db_exchange.erl b/deps/rabbit/src/rabbit_db_exchange.erl index 1b0a2382b544..e45edd6dda66 100644 --- a/deps/rabbit/src/rabbit_db_exchange.erl +++ b/deps/rabbit/src/rabbit_db_exchange.erl @@ -265,9 +265,10 @@ count_in_khepri() -> %% update(). %% ------------------------------------------------------------------- --spec update(ExchangeName, UpdateFun) -> ok when +-spec update(ExchangeName, UpdateFun) -> Ret when ExchangeName :: rabbit_exchange:name(), - UpdateFun :: fun((Exchange) -> Exchange). + UpdateFun :: fun((Exchange) -> Exchange), + Ret :: ok | rabbit_khepri:timeout_error(). %% @doc Updates an existing exchange record using the result of %% `UpdateFun'. %% @@ -367,7 +368,9 @@ update_in_khepri_tx(Name, Fun) -> -spec create_or_get(Exchange) -> Ret when Exchange :: rabbit_types:exchange(), - Ret :: {new, Exchange} | {existing, Exchange}. + Ret :: {new, Exchange} | + {existing, Exchange} | + rabbit_khepri:timeout_error(). %% @doc Writes an exchange record if it doesn't exist already or returns %% the existing one. %% @@ -399,7 +402,9 @@ create_or_get_in_khepri(#exchange{name = XName} = X) -> ok -> {new, X}; {error, {khepri, mismatching_node, #{node_props := #{data := ExistingX}}}} -> - {existing, ExistingX} + {existing, ExistingX}; + {error, timeout} = Err -> + Err end. %% ------------------------------------------------------------------- @@ -523,17 +528,15 @@ next_serial_in_khepri(XName) -> UpdatePath = khepri_path:combine_with_conditions( Path, [#if_payload_version{version = Vsn}]), - case rabbit_khepri:put(UpdatePath, Serial + 1) of + case rabbit_khepri:put(UpdatePath, Serial + 1, #{timeout => infinity}) of ok -> Serial; {error, {khepri, mismatching_node, _}} -> - next_serial_in_khepri(XName); - Err -> - Err + next_serial_in_khepri(XName) end; _ -> Serial = 1, - ok = rabbit_khepri:put(Path, Serial + 1), + ok = rabbit_khepri:put(Path, Serial + 1, #{timeout => infinity}), Serial end. @@ -560,7 +563,10 @@ next_serial_in_khepri_tx(#exchange{name = XName}) -> Exchange :: rabbit_types:exchange(), Binding :: rabbit_types:binding(), Deletions :: dict:dict(), - Ret :: {error, not_found} | {error, in_use} | {deleted, Exchange, [Binding], Deletions}. + Ret :: {deleted, Exchange, [Binding], Deletions} | + {error, not_found} | + {error, in_use} | + rabbit_khepri:timeout_error(). %% @doc Deletes an exchange record from the database. If `IfUnused' is set %% to `true', it is only deleted when there are no bindings present on the %% exchange. diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index f2d7b512406b..3ffa50594df1 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -731,7 +731,7 @@ update_durable_in_khepri(UpdateFun, FilterFun) -> end, [], Props), Res = rabbit_khepri:transaction( fun() -> - for_each_while_ok( + rabbit_misc:for_each_while_ok( fun({Path, Q}) -> khepri_tx:put(Path, Q) end, Updates) end), @@ -749,16 +749,6 @@ update_durable_in_khepri(UpdateFun, FilterFun) -> Error end. -for_each_while_ok(Fun, [Elem | Rest]) -> - case Fun(Elem) of - ok -> - for_each_while_ok(Fun, Rest); - {error, _} = Error -> - Error - end; -for_each_while_ok(_, []) -> - ok. - %% ------------------------------------------------------------------- %% exists(). %% ------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_definitions.erl b/deps/rabbit/src/rabbit_definitions.erl index baa5995b92d4..c6263245b7cf 100644 --- a/deps/rabbit/src/rabbit_definitions.erl +++ b/deps/rabbit/src/rabbit_definitions.erl @@ -863,13 +863,18 @@ add_exchange_int(Exchange, Name, ActingUser) -> undefined -> false; %% =< 2.2.0 I -> I end, - rabbit_exchange:declare(Name, - rabbit_exchange:check_type(maps:get(type, Exchange, undefined)), - maps:get(durable, Exchange, undefined), - maps:get(auto_delete, Exchange, undefined), - Internal, - args(maps:get(arguments, Exchange, undefined)), - ActingUser) + case rabbit_exchange:declare(Name, + rabbit_exchange:check_type(maps:get(type, Exchange, undefined)), + maps:get(durable, Exchange, undefined), + maps:get(auto_delete, Exchange, undefined), + Internal, + args(maps:get(arguments, Exchange, undefined)), + ActingUser) of + {ok, _Exchange} -> + ok; + {error, timeout} = Err -> + throw(Err) + end end. add_binding(Binding, ActingUser) -> diff --git a/deps/rabbit/src/rabbit_exchange.erl b/deps/rabbit/src/rabbit_exchange.erl index 22fbaafb69c4..5a00d4de80da 100644 --- a/deps/rabbit/src/rabbit_exchange.erl +++ b/deps/rabbit/src/rabbit_exchange.erl @@ -13,7 +13,8 @@ lookup/1, lookup_many/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2, update_scratch/3, update_decorators/2, immutable/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4, - route/2, route/3, delete/3, validate_binding/2, count/0]). + route/2, route/3, delete/3, validate_binding/2, count/0, + ensure_deleted/3]). -export([list_names/0]). -export([serialise_events/1]). -export([serial/1, peek_serial/1]). @@ -91,10 +92,16 @@ serial(X) -> true -> rabbit_db_exchange:next_serial(X#exchange.name) end. --spec declare - (name(), type(), boolean(), boolean(), boolean(), - rabbit_framing:amqp_table(), rabbit_types:username()) - -> rabbit_types:exchange(). +-spec declare(Name, Type, Durable, AutoDelete, Internal, Args, Username) -> + Ret when + Name :: name(), + Type :: type(), + Durable :: boolean(), + AutoDelete :: boolean(), + Internal :: boolean(), + Args :: rabbit_framing:amqp_table(), + Username :: rabbit_types:username(), + Ret :: {ok, rabbit_types:exchange()} | {error, timeout}. declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) -> X = rabbit_exchange_decorator:set( @@ -121,14 +128,16 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) -> Serial = serial(Exchange), ok = callback(X, create, Serial, [Exchange]), rabbit_event:notify(exchange_created, info(Exchange)), - Exchange; + {ok, Exchange}; {existing, Exchange} -> - Exchange + {ok, Exchange}; + {error, timeout} = Err -> + Err end; _ -> rabbit_log:warning("ignoring exchange.declare for exchange ~tp, exchange.delete in progress~n.", [XName]), - X + {ok, X} end. %% Used with binaries sent over the wire; the type may not exist. @@ -444,9 +453,13 @@ cons_if_present(XName, L) -> -spec delete (name(), 'true', rabbit_types:username()) -> - 'ok'| rabbit_types:error('not_found' | 'in_use'); + 'ok' | + rabbit_types:error('not_found' | 'in_use') | + rabbit_khepri:timeout_error(); (name(), 'false', rabbit_types:username()) -> - 'ok' | rabbit_types:error('not_found'). + 'ok' | + rabbit_types:error('not_found') | + rabbit_khepri:timeout_error(). delete(XName, IfUnused, Username) -> try @@ -478,6 +491,26 @@ process_deletions({deleted, #exchange{name = XName} = X, Bs, Deletions}) -> rabbit_binding:add_deletion( XName, {X, deleted, Bs}, Deletions)). +-spec ensure_deleted(ExchangeName, IfUnused, Username) -> Ret when + ExchangeName :: name(), + IfUnused :: boolean(), + Username :: rabbit_types:username(), + Ret :: ok | + rabbit_types:error('in_use') | + rabbit_khepri:timeout_error(). +%% @doc A wrapper around `delete/3' which returns `ok' in the case that the +%% exchange did not exist at time of deletion. + +ensure_deleted(XName, IfUnused, Username) -> + case delete(XName, IfUnused, Username) of + ok -> + ok; + {error, not_found} -> + ok; + {error, _} = Err -> + Err + end. + -spec validate_binding (rabbit_types:exchange(), rabbit_types:binding()) -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}). diff --git a/deps/rabbit/src/rabbit_logger_exchange_h.erl b/deps/rabbit/src/rabbit_logger_exchange_h.erl index 9cdde43b5967..781e4ce6203a 100644 --- a/deps/rabbit/src/rabbit_logger_exchange_h.erl +++ b/deps/rabbit/src/rabbit_logger_exchange_h.erl @@ -148,18 +148,16 @@ wait_for_initial_pass(N) -> end. setup_proc( - #{config := #{exchange := #resource{name = Name, - virtual_host = VHost}}} = Config) -> + #{config := #{exchange := Exchange}} = Config) -> case declare_exchange(Config) of ok -> ?LOG_INFO( - "Logging to exchange '~ts' in vhost '~ts' ready", [Name, VHost], + "Logging to ~ts ready", [rabbit_misc:rs(Exchange)], #{domain => ?RMQLOG_DOMAIN_GLOBAL}); error -> ?LOG_DEBUG( - "Logging to exchange '~ts' in vhost '~ts' not ready, " - "trying again in ~b second(s)", - [Name, VHost, ?DECL_EXCHANGE_INTERVAL_SECS], + "Logging to ~ts not ready, trying again in ~b second(s)", + [rabbit_misc:rs(Exchange), ?DECL_EXCHANGE_INTERVAL_SECS], #{domain => ?RMQLOG_DOMAIN_GLOBAL}), receive stop -> ok @@ -168,36 +166,45 @@ setup_proc( end end. -declare_exchange( - #{config := #{exchange := #resource{name = Name, - virtual_host = VHost} = Exchange}}) -> - try - %% Durable. - #exchange{} = rabbit_exchange:declare( - Exchange, topic, true, false, true, [], - ?INTERNAL_USER), - ?LOG_DEBUG( - "Declared exchange '~ts' in vhost '~ts'", - [Name, VHost], - #{domain => ?RMQLOG_DOMAIN_GLOBAL}), - ok +declare_exchange(#{config := #{exchange := Exchange}}) -> + try rabbit_exchange:declare( + Exchange, topic, true, false, true, [], ?INTERNAL_USER) of + {ok, #exchange{}} -> + ?LOG_DEBUG( + "Declared ~ts", + [rabbit_misc:rs(Exchange)], + #{domain => ?RMQLOG_DOMAIN_GLOBAL}), + ok; + {error, timeout} -> + ?LOG_DEBUG( + "Could not declare ~ts because the operation timed out", + [rabbit_misc:rs(Exchange)], + #{domain => ?RMQLOG_DOMAIN_GLOBAL}), + error catch Class:Reason -> ?LOG_DEBUG( - "Could not declare exchange '~ts' in vhost '~ts', " - "reason: ~0p:~0p", - [Name, VHost, Class, Reason], + "Could not declare ~ts, reason: ~0p:~0p", + [rabbit_misc:rs(Exchange), Class, Reason], #{domain => ?RMQLOG_DOMAIN_GLOBAL}), error end. unconfigure_exchange( - #{config := #{exchange := #resource{name = Name, - virtual_host = VHost} = Exchange, + #{config := #{exchange := Exchange, setup_proc := Pid}}) -> Pid ! stop, - _ = rabbit_exchange:delete(Exchange, false, ?INTERNAL_USER), + case rabbit_exchange:ensure_deleted(Exchange, false, ?INTERNAL_USER) of + ok -> + ok; + {error, timeout} -> + ?LOG_ERROR( + "Could not delete ~ts due to a timeout", + [rabbit_misc:rs(Exchange)], + #{domain => ?RMQLOG_DOMAIN_GLOBAL}), + ok + end, ?LOG_INFO( - "Logging to exchange '~ts' in vhost '~ts' disabled", - [Name, VHost], + "Logging to ~ts disabled", + [rabbit_misc:rs(Exchange)], #{domain => ?RMQLOG_DOMAIN_GLOBAL}). diff --git a/deps/rabbit/src/rabbit_vhost.erl b/deps/rabbit/src/rabbit_vhost.erl index 42838f4451dd..fd4b0dc9f3ea 100644 --- a/deps/rabbit/src/rabbit_vhost.erl +++ b/deps/rabbit/src/rabbit_vhost.erl @@ -201,33 +201,57 @@ do_add(Name, Metadata, ActingUser) -> ok end, rabbit_db_vhost_defaults:apply(Name, ActingUser), - _ = [begin - Resource = rabbit_misc:r(Name, exchange, ExchangeName), - rabbit_log:debug("Will declare an exchange ~tp", [Resource]), - _ = rabbit_exchange:declare(Resource, Type, true, false, Internal, [], ActingUser) - end || {ExchangeName, Type, Internal} <- - [{<<"">>, direct, false}, - {<<"amq.direct">>, direct, false}, - {<<"amq.topic">>, topic, false}, - %% per 0-9-1 pdf - {<<"amq.match">>, headers, false}, - %% per 0-9-1 xml - {<<"amq.headers">>, headers, false}, - {<<"amq.fanout">>, fanout, false}, - {<<"amq.rabbitmq.trace">>, topic, true}]], - case rabbit_vhost_sup_sup:start_on_all_nodes(Name) of + case declare_default_exchanges(Name, ActingUser) of ok -> - rabbit_event:notify(vhost_created, info(VHost) - ++ [{user_who_performed_action, ActingUser}, - {description, Description}, - {tags, Tags}]), - ok; - {error, Reason} -> - Msg = rabbit_misc:format("failed to set up vhost '~ts': ~tp", - [Name, Reason]), + case rabbit_vhost_sup_sup:start_on_all_nodes(Name) of + ok -> + rabbit_event:notify(vhost_created, info(VHost) + ++ [{user_who_performed_action, ActingUser}, + {description, Description}, + {tags, Tags}]), + ok; + {error, Reason} -> + Msg = rabbit_misc:format("failed to set up vhost '~ts': ~tp", + [Name, Reason]), + {error, Msg} + end; + {error, timeout} -> + Msg = rabbit_misc:format( + "failed to set up vhost '~ts' because a timeout occurred " + "while adding default exchanges", + [Name]), {error, Msg} end. +-spec declare_default_exchanges(VHostName, ActingUser) -> Ret when + VHostName :: vhost:name(), + ActingUser :: rabbit_types:username(), + Ret :: ok | {error, timeout}. + +declare_default_exchanges(VHostName, ActingUser) -> + DefaultExchanges = [{<<"">>, direct, false}, + {<<"amq.direct">>, direct, false}, + {<<"amq.topic">>, topic, false}, + %% per 0-9-1 pdf + {<<"amq.match">>, headers, false}, + %% per 0-9-1 xml + {<<"amq.headers">>, headers, false}, + {<<"amq.fanout">>, fanout, false}, + {<<"amq.rabbitmq.trace">>, topic, true}], + rabbit_misc:for_each_while_ok( + fun({ExchangeName, Type, Internal}) -> + Resource = rabbit_misc:r(VHostName, exchange, ExchangeName), + rabbit_log:debug("Will declare an exchange ~tp", [Resource]), + case rabbit_exchange:declare( + Resource, Type, true, false, Internal, [], + ActingUser) of + {ok, _} -> + ok; + {error, timeout} = Err -> + Err + end + end, DefaultExchanges). + -spec update_metadata(vhost:name(), vhost:metadata(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). update_metadata(Name, Metadata0, ActingUser) -> Metadata = maps:with([description, tags, default_queue_type], Metadata0), @@ -275,7 +299,7 @@ delete(VHost, ActingUser) -> assert_benign(rabbit_amqqueue:with(Name, QDelFun), ActingUser) end || Q <- rabbit_amqqueue:list(VHost)], rabbit_log:info("Deleting exchanges in vhost '~ts' because it's being deleted", [VHost]), - [assert_benign(rabbit_exchange:delete(Name, false, ActingUser), ActingUser) || + [ok = rabbit_exchange:ensure_deleted(Name, false, ActingUser) || #exchange{name = Name} <- rabbit_exchange:list(VHost)], rabbit_log:info("Clearing policies and runtime parameters in vhost '~ts' because it's being deleted", [VHost]), _ = rabbit_runtime_parameters:clear_vhost(VHost, ActingUser), diff --git a/deps/rabbit/test/bindings_SUITE.erl b/deps/rabbit/test/bindings_SUITE.erl index 5ffb010b2697..b80a09eb1afc 100644 --- a/deps/rabbit/test/bindings_SUITE.erl +++ b/deps/rabbit/test/bindings_SUITE.erl @@ -873,7 +873,8 @@ delete_queues() -> || Q <- rabbit_amqqueue:list()]. delete_exchange(Name) -> - _ = rabbit_exchange:delete(rabbit_misc:r(<<"/">>, exchange, Name), false, <<"dummy">>). + ok = rabbit_exchange:ensure_deleted( + rabbit_misc:r(<<"/">>, exchange, Name), false, <<"dummy">>). declare(Ch, Q, Args) -> declare(Ch, Q, Args, true). diff --git a/deps/rabbit/test/cluster_minority_SUITE.erl b/deps/rabbit/test/cluster_minority_SUITE.erl index a3ec055a03f6..a6a8f4759ba4 100644 --- a/deps/rabbit/test/cluster_minority_SUITE.erl +++ b/deps/rabbit/test/cluster_minority_SUITE.erl @@ -24,6 +24,7 @@ groups() -> {client_operations, [], [open_connection, open_channel, declare_exchange, + delete_exchange, declare_binding, delete_binding, declare_queue, @@ -100,6 +101,8 @@ init_per_group(Group, Config0) when Group == client_operations; #'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = <<"amq.fanout">>, source = <<"amq.direct">>, routing_key = <<"binding-to-be-deleted">>}), + %% To be used in delete_exchange + #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = <<"exchange-to-be-deleted">>}), %% Lower the default Khepri command timeout. By default this is set %% to 30s in `rabbit_khepri:setup/1' which makes the cases in this @@ -157,6 +160,12 @@ declare_exchange(Config) -> ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _}, amqp_channel:call(Ch, #'exchange.declare'{exchange = <<"test-exchange">>})). +delete_exchange(Config) -> + [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A), + ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _}, + amqp_channel:call(Ch, #'exchange.delete'{exchange = <<"exchange-to-be-deleted">>})). + declare_binding(Config) -> [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A), diff --git a/deps/rabbit/test/exchanges_SUITE.erl b/deps/rabbit/test/exchanges_SUITE.erl index b0f5694dce18..e74cd95917e9 100644 --- a/deps/rabbit/test/exchanges_SUITE.erl +++ b/deps/rabbit/test/exchanges_SUITE.erl @@ -340,7 +340,8 @@ delete_queues() -> || Q <- rabbit_amqqueue:list()]. delete_exchange(Name) -> - _ = rabbit_exchange:delete(rabbit_misc:r(<<"/">>, exchange, Name), false, <<"dummy">>). + ok = rabbit_exchange:ensure_deleted( + rabbit_misc:r(<<"/">>, exchange, Name), false, <<"dummy">>). declare(Ch, Q, Args) -> declare(Ch, Q, Args, true). diff --git a/deps/rabbit/test/routing_SUITE.erl b/deps/rabbit/test/routing_SUITE.erl index 49cef1aa61f8..1bbd453ef22b 100644 --- a/deps/rabbit/test/routing_SUITE.erl +++ b/deps/rabbit/test/routing_SUITE.erl @@ -84,9 +84,9 @@ topic(Config) -> topic1(_Config) -> XName = rabbit_misc:r(?VHOST, exchange, <<"topic_matching-exchange">>), - X = rabbit_exchange:declare( - XName, topic, _Durable = true, _AutoDelete = false, - _Internal = false, _Args = [], ?USER), + {ok, X} = rabbit_exchange:declare( + XName, topic, _Durable = true, _AutoDelete = false, + _Internal = false, _Args = [], ?USER), %% add some bindings Bindings = [#binding{source = XName, diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl index 6324165976e4..af6fc536b046 100644 --- a/deps/rabbit_common/src/rabbit_misc.erl +++ b/deps/rabbit_common/src/rabbit_misc.erl @@ -89,6 +89,7 @@ maps_put_falsy/3 ]). -export([remote_sup_child/2]). +-export([for_each_while_ok/2]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -1632,3 +1633,25 @@ remote_sup_child(Node, Sup) -> [] -> {error, no_child}; {badrpc, {'EXIT', {noproc, _}}} -> {error, no_sup} end. + +-spec for_each_while_ok(ForEachFun, List) -> Ret when + ForEachFun :: fun((Element) -> ok | {error, ErrReason}), + ErrReason :: any(), + Element :: any(), + List :: [Element], + Ret :: ok | {error, ErrReason}. +%% @doc Calls the given `ForEachFun' for each element in the given `List', +%% short-circuiting if the function returns `{error,_}'. +%% +%% @returns the first `{error,_}' returned by `ForEachFun' or `ok' if +%% `ForEachFun' never returns an error tuple. + +for_each_while_ok(Fun, [Elem | Rest]) -> + case Fun(Elem) of + ok -> + for_each_while_ok(Fun, Rest); + {error, _} = Error -> + Error + end; +for_each_while_ok(_, []) -> + ok. diff --git a/deps/rabbitmq_cli/test/ctl/list_exchanges_command_test.exs b/deps/rabbitmq_cli/test/ctl/list_exchanges_command_test.exs index 1c5f2c8acec0..e8d146ae3139 100644 --- a/deps/rabbitmq_cli/test/ctl/list_exchanges_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/list_exchanges_command_test.exs @@ -96,7 +96,7 @@ defmodule ListExchangesCommandTest do test "run: default options test", context do exchange_name = "test_exchange" - declare_exchange(exchange_name, @vhost) + {:ok, _} = declare_exchange(exchange_name, @vhost) assert MapSet.new(run_command_to_list(@command, [["name", "type"], context[:opts]])) == MapSet.new( @@ -106,8 +106,8 @@ defmodule ListExchangesCommandTest do end test "run: list multiple exchanges", context do - declare_exchange("test_exchange_1", @vhost, :direct) - declare_exchange("test_exchange_2", @vhost, :fanout) + {:ok, _} = declare_exchange("test_exchange_1", @vhost, :direct) + {:ok, _} = declare_exchange("test_exchange_2", @vhost, :fanout) non_default_exchanges = run_command_to_list(@command, [["name", "type"], context[:opts]]) @@ -124,8 +124,8 @@ defmodule ListExchangesCommandTest do end test "run: info keys filter single key", context do - declare_exchange("test_exchange_1", @vhost) - declare_exchange("test_exchange_2", @vhost) + {:ok, _} = declare_exchange("test_exchange_1", @vhost) + {:ok, _} = declare_exchange("test_exchange_2", @vhost) non_default_exchanges = run_command_to_list(@command, [["name"], context[:opts]]) @@ -138,8 +138,8 @@ defmodule ListExchangesCommandTest do end test "run: info keys add additional keys", context do - declare_exchange("durable_exchange", @vhost, :direct, true) - declare_exchange("auto_delete_exchange", @vhost, :fanout, false, true) + {:ok, _} = declare_exchange("durable_exchange", @vhost, :direct, true) + {:ok, _} = declare_exchange("auto_delete_exchange", @vhost, :fanout, false, true) non_default_exchanges = run_command_to_list(@command, [["name", "type", "durable", "auto_delete"], context[:opts]]) @@ -162,8 +162,8 @@ defmodule ListExchangesCommandTest do delete_vhost(other_vhost) end) - declare_exchange("test_exchange_1", @vhost) - declare_exchange("test_exchange_2", other_vhost) + {:ok, _} = declare_exchange("test_exchange_1", @vhost) + {:ok, _} = declare_exchange("test_exchange_2", other_vhost) non_default_exchanges1 = run_command_to_list(@command, [["name"], context[:opts]]) diff --git a/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl b/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl index 3c75cb03d97e..70251406b20c 100644 --- a/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl +++ b/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl @@ -38,13 +38,22 @@ info(_X) -> []. info(_X, _) -> []. register() -> - _ = rabbit_exchange:declare(exchange(), topic, true, false, true, [], - ?INTERNAL_USER), - gen_event:add_handler(rabbit_event, ?MODULE, []). + case rabbit_exchange:declare(exchange(), topic, true, false, true, [], + ?INTERNAL_USER) of + {ok, _Exchange} -> + gen_event:add_handler(rabbit_event, ?MODULE, []); + {error, timeout} = Err -> + Err + end. unregister() -> - _ = rabbit_exchange:delete(exchange(), false, ?INTERNAL_USER), - gen_event:delete_handler(rabbit_event, ?MODULE, []). + case rabbit_exchange:ensure_deleted(exchange(), false, ?INTERNAL_USER) of + ok -> + gen_event:delete_handler(rabbit_event, ?MODULE, []), + ok; + {error, _} = Err -> + Err + end. exchange() -> exchange(get_vhost()). diff --git a/deps/rabbitmq_federation/test/unit_inbroker_SUITE.erl b/deps/rabbitmq_federation/test/unit_inbroker_SUITE.erl index 4cb0096fbc67..dfc3a10086db 100644 --- a/deps/rabbitmq_federation/test/unit_inbroker_SUITE.erl +++ b/deps/rabbitmq_federation/test/unit_inbroker_SUITE.erl @@ -200,10 +200,12 @@ upstream_validation(_Config) -> ok. with_exchanges(Fun) -> - rabbit_exchange:declare(r(?US_NAME), fanout, false, false, false, [], - <<"acting-user">>), - X = rabbit_exchange:declare(r(?DS_NAME), fanout, false, false, false, [], - <<"acting-user">>), + {ok, _} = rabbit_exchange:declare( + r(?US_NAME), fanout, false, false, false, [], + <<"acting-user">>), + {ok, X} = rabbit_exchange:declare( + r(?DS_NAME), fanout, false, false, false, [], + <<"acting-user">>), Fun(X), %% Delete downstream first or it will recreate the upstream rabbit_exchange:delete(r(?DS_NAME), false, <<"acting-user">>), diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 9137fefc862e..51257fe64a90 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -769,33 +769,38 @@ declare_super_stream_exchange(VirtualHost, Name, Username) -> true), CheckedType = rabbit_exchange:check_type(<<"direct">>), ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName), - X = case rabbit_exchange:lookup(ExchangeName) of - {ok, FoundX} -> - FoundX; - {error, not_found} -> - rabbit_exchange:declare(ExchangeName, - CheckedType, - true, - false, - false, - Args, - Username) - end, - try - ok = - rabbit_exchange:assert_equivalence(X, - CheckedType, - true, - false, - false, - Args) - catch - exit:ExitError -> - % likely to be a problem of inequivalent args on an existing stream - rabbit_log:error("Error while creating ~tp super stream exchange: " - "~tp", - [Name, ExitError]), - {error, validation_failed} + XResult = case rabbit_exchange:lookup(ExchangeName) of + {ok, FoundX} -> + {ok, FoundX}; + {error, not_found} -> + rabbit_exchange:declare(ExchangeName, + CheckedType, + true, + false, + false, + Args, + Username) + end, + case XResult of + {ok, X} -> + try + ok = + rabbit_exchange:assert_equivalence(X, + CheckedType, + true, + false, + false, + Args) + catch + exit:ExitError -> + % likely to be a problem of inequivalent args on an existing stream + rabbit_log:error("Error while creating ~tp super stream exchange: " + "~tp", + [Name, ExitError]), + {error, validation_failed} + end; + {error, timeout} = Err -> + Err end; error -> {error, validation_failed} @@ -889,11 +894,12 @@ delete_super_stream_exchange(VirtualHost, Name, Username) -> case rabbit_stream_utils:enforce_correct_name(Name) of {ok, CorrectName} -> ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName), - case rabbit_exchange:delete(ExchangeName, false, Username) of - {error, not_found} -> - ok; + case rabbit_exchange:ensure_deleted( + ExchangeName, false, Username) of ok -> - ok + ok; + {error, timeout} = Err -> + Err end; error -> {error, validation_failed}