Skip to content

Commit

Permalink
Merge pull request #11785 from rabbitmq/md/khepri-minority-errors/rab…
Browse files Browse the repository at this point in the history
…bit_db_exchange

Handle timeouts possible in Khepri minority in `rabbit_db_exchange`
  • Loading branch information
the-mikedavis authored Jul 24, 2024
2 parents 1a9da90 + b56abee commit 4207faf
Show file tree
Hide file tree
Showing 17 changed files with 303 additions and 159 deletions.
26 changes: 21 additions & 5 deletions deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,18 @@ handle_http_req(<<"PUT">>,
{error, not_found} ->
ok = prohibit_cr_lf(XNameBin),
ok = prohibit_reserved_amq(XName),
rabbit_exchange:declare(
XName, XTypeAtom, Durable, AutoDelete,
Internal, XArgs, Username)
case rabbit_exchange:declare(
XName, XTypeAtom, Durable, AutoDelete,
Internal, XArgs, Username) of
{ok, DeclaredX} ->
DeclaredX;
{error, timeout} ->
throw(
<<"503">>,
"Could not create ~ts because the operation "
"timed out",
[rabbit_misc:rs(XName)])
end
end,
try rabbit_exchange:assert_equivalence(
X, XTypeAtom, Durable, AutoDelete, Internal, XArgs) of
Expand Down Expand Up @@ -285,8 +294,15 @@ handle_http_req(<<"DELETE">>,
ok = prohibit_default_exchange(XName),
ok = prohibit_reserved_amq(XName),
PermCache = check_resource_access(XName, configure, User, PermCache0),
_ = rabbit_exchange:delete(XName, false, Username),
{<<"204">>, null, {PermCache, TopicPermCache}};
case rabbit_exchange:ensure_deleted(XName, false, Username) of
ok ->
{<<"204">>, null, {PermCache, TopicPermCache}};
{error, timeout} ->
throw(
<<"503">>,
"failed to delete ~ts due to a timeout",
[rabbit_misc:rs(XName)])
end;

handle_http_req(<<"POST">>,
[<<"bindings">>],
Expand Down
34 changes: 23 additions & 11 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2512,13 +2512,16 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
check_not_default_exchange(ExchangeName),
check_exchange_deletion(ExchangeName),
check_configure_permitted(ExchangeName, User, AuthzContext),
case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of
{error, not_found} ->
case rabbit_exchange:ensure_deleted(ExchangeName, IfUnused, Username) of
ok ->
ok;
{error, in_use} ->
rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(ExchangeName)]);
ok ->
ok
{error, timeout} ->
rabbit_misc:protocol_error(
internal_error,
"failed to delete ~ts due to a timeout",
[rabbit_misc:rs(ExchangeName)])
end;
handle_method(#'queue.purge'{queue = QueueNameBin},
ConnPid, AuthzContext, _CollectorPid, VHostPath, User) ->
Expand Down Expand Up @@ -2566,13 +2569,22 @@ handle_method(#'exchange.declare'{exchange = XNameBin,
check_write_permitted(AName, User, AuthzContext),
ok
end,
rabbit_exchange:declare(ExchangeName,
CheckedType,
Durable,
AutoDelete,
Internal,
Args,
Username)
case rabbit_exchange:declare(ExchangeName,
CheckedType,
Durable,
AutoDelete,
Internal,
Args,
Username) of
{ok, DeclaredX} ->
DeclaredX;
{error, timeout} ->
rabbit_misc:protocol_error(
internal_error,
"failed to declare ~ts because the operation "
"timed out",
[rabbit_misc:rs(ExchangeName)])
end
end,
ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable,
AutoDelete, Internal, Args);
Expand Down
26 changes: 16 additions & 10 deletions deps/rabbit/src/rabbit_db_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,10 @@ count_in_khepri() ->
%% update().
%% -------------------------------------------------------------------

-spec update(ExchangeName, UpdateFun) -> ok when
-spec update(ExchangeName, UpdateFun) -> Ret when
ExchangeName :: rabbit_exchange:name(),
UpdateFun :: fun((Exchange) -> Exchange).
UpdateFun :: fun((Exchange) -> Exchange),
Ret :: ok | rabbit_khepri:timeout_error().
%% @doc Updates an existing exchange record using the result of
%% `UpdateFun'.
%%
Expand Down Expand Up @@ -367,7 +368,9 @@ update_in_khepri_tx(Name, Fun) ->

-spec create_or_get(Exchange) -> Ret when
Exchange :: rabbit_types:exchange(),
Ret :: {new, Exchange} | {existing, Exchange}.
Ret :: {new, Exchange} |
{existing, Exchange} |
rabbit_khepri:timeout_error().
%% @doc Writes an exchange record if it doesn't exist already or returns
%% the existing one.
%%
Expand Down Expand Up @@ -399,7 +402,9 @@ create_or_get_in_khepri(#exchange{name = XName} = X) ->
ok ->
{new, X};
{error, {khepri, mismatching_node, #{node_props := #{data := ExistingX}}}} ->
{existing, ExistingX}
{existing, ExistingX};
{error, timeout} = Err ->
Err
end.

%% -------------------------------------------------------------------
Expand Down Expand Up @@ -523,17 +528,15 @@ next_serial_in_khepri(XName) ->
UpdatePath =
khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
case rabbit_khepri:put(UpdatePath, Serial + 1) of
case rabbit_khepri:put(UpdatePath, Serial + 1, #{timeout => infinity}) of
ok ->
Serial;
{error, {khepri, mismatching_node, _}} ->
next_serial_in_khepri(XName);
Err ->
Err
next_serial_in_khepri(XName)
end;
_ ->
Serial = 1,
ok = rabbit_khepri:put(Path, Serial + 1),
ok = rabbit_khepri:put(Path, Serial + 1, #{timeout => infinity}),
Serial
end.

Expand All @@ -560,7 +563,10 @@ next_serial_in_khepri_tx(#exchange{name = XName}) ->
Exchange :: rabbit_types:exchange(),
Binding :: rabbit_types:binding(),
Deletions :: dict:dict(),
Ret :: {error, not_found} | {error, in_use} | {deleted, Exchange, [Binding], Deletions}.
Ret :: {deleted, Exchange, [Binding], Deletions} |
{error, not_found} |
{error, in_use} |
rabbit_khepri:timeout_error().
%% @doc Deletes an exchange record from the database. If `IfUnused' is set
%% to `true', it is only deleted when there are no bindings present on the
%% exchange.
Expand Down
12 changes: 1 addition & 11 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ update_durable_in_khepri(UpdateFun, FilterFun) ->
end, [], Props),
Res = rabbit_khepri:transaction(
fun() ->
for_each_while_ok(
rabbit_misc:for_each_while_ok(
fun({Path, Q}) -> khepri_tx:put(Path, Q) end,
Updates)
end),
Expand All @@ -749,16 +749,6 @@ update_durable_in_khepri(UpdateFun, FilterFun) ->
Error
end.

for_each_while_ok(Fun, [Elem | Rest]) ->
case Fun(Elem) of
ok ->
for_each_while_ok(Fun, Rest);
{error, _} = Error ->
Error
end;
for_each_while_ok(_, []) ->
ok.

%% -------------------------------------------------------------------
%% exists().
%% -------------------------------------------------------------------
Expand Down
19 changes: 12 additions & 7 deletions deps/rabbit/src/rabbit_definitions.erl
Original file line number Diff line number Diff line change
Expand Up @@ -863,13 +863,18 @@ add_exchange_int(Exchange, Name, ActingUser) ->
undefined -> false; %% =< 2.2.0
I -> I
end,
rabbit_exchange:declare(Name,
rabbit_exchange:check_type(maps:get(type, Exchange, undefined)),
maps:get(durable, Exchange, undefined),
maps:get(auto_delete, Exchange, undefined),
Internal,
args(maps:get(arguments, Exchange, undefined)),
ActingUser)
case rabbit_exchange:declare(Name,
rabbit_exchange:check_type(maps:get(type, Exchange, undefined)),
maps:get(durable, Exchange, undefined),
maps:get(auto_delete, Exchange, undefined),
Internal,
args(maps:get(arguments, Exchange, undefined)),
ActingUser) of
{ok, _Exchange} ->
ok;
{error, timeout} = Err ->
throw(Err)
end
end.

add_binding(Binding, ActingUser) ->
Expand Down
53 changes: 43 additions & 10 deletions deps/rabbit/src/rabbit_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
lookup/1, lookup_many/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
update_scratch/3, update_decorators/2, immutable/1,
info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4,
route/2, route/3, delete/3, validate_binding/2, count/0]).
route/2, route/3, delete/3, validate_binding/2, count/0,
ensure_deleted/3]).
-export([list_names/0]).
-export([serialise_events/1]).
-export([serial/1, peek_serial/1]).
Expand Down Expand Up @@ -91,10 +92,16 @@ serial(X) ->
true -> rabbit_db_exchange:next_serial(X#exchange.name)
end.

-spec declare
(name(), type(), boolean(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:username())
-> rabbit_types:exchange().
-spec declare(Name, Type, Durable, AutoDelete, Internal, Args, Username) ->
Ret when
Name :: name(),
Type :: type(),
Durable :: boolean(),
AutoDelete :: boolean(),
Internal :: boolean(),
Args :: rabbit_framing:amqp_table(),
Username :: rabbit_types:username(),
Ret :: {ok, rabbit_types:exchange()} | {error, timeout}.

declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
X = rabbit_exchange_decorator:set(
Expand All @@ -121,14 +128,16 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
Serial = serial(Exchange),
ok = callback(X, create, Serial, [Exchange]),
rabbit_event:notify(exchange_created, info(Exchange)),
Exchange;
{ok, Exchange};
{existing, Exchange} ->
Exchange
{ok, Exchange};
{error, timeout} = Err ->
Err
end;
_ ->
rabbit_log:warning("ignoring exchange.declare for exchange ~tp,
exchange.delete in progress~n.", [XName]),
X
{ok, X}
end.

%% Used with binaries sent over the wire; the type may not exist.
Expand Down Expand Up @@ -444,9 +453,13 @@ cons_if_present(XName, L) ->

-spec delete
(name(), 'true', rabbit_types:username()) ->
'ok'| rabbit_types:error('not_found' | 'in_use');
'ok' |
rabbit_types:error('not_found' | 'in_use') |
rabbit_khepri:timeout_error();
(name(), 'false', rabbit_types:username()) ->
'ok' | rabbit_types:error('not_found').
'ok' |
rabbit_types:error('not_found') |
rabbit_khepri:timeout_error().

delete(XName, IfUnused, Username) ->
try
Expand Down Expand Up @@ -478,6 +491,26 @@ process_deletions({deleted, #exchange{name = XName} = X, Bs, Deletions}) ->
rabbit_binding:add_deletion(
XName, {X, deleted, Bs}, Deletions)).

-spec ensure_deleted(ExchangeName, IfUnused, Username) -> Ret when
ExchangeName :: name(),
IfUnused :: boolean(),
Username :: rabbit_types:username(),
Ret :: ok |
rabbit_types:error('in_use') |
rabbit_khepri:timeout_error().
%% @doc A wrapper around `delete/3' which returns `ok' in the case that the
%% exchange did not exist at time of deletion.

ensure_deleted(XName, IfUnused, Username) ->
case delete(XName, IfUnused, Username) of
ok ->
ok;
{error, not_found} ->
ok;
{error, _} = Err ->
Err
end.

-spec validate_binding
(rabbit_types:exchange(), rabbit_types:binding())
-> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}).
Expand Down
61 changes: 34 additions & 27 deletions deps/rabbit/src/rabbit_logger_exchange_h.erl
Original file line number Diff line number Diff line change
Expand Up @@ -148,18 +148,16 @@ wait_for_initial_pass(N) ->
end.

setup_proc(
#{config := #{exchange := #resource{name = Name,
virtual_host = VHost}}} = Config) ->
#{config := #{exchange := Exchange}} = Config) ->
case declare_exchange(Config) of
ok ->
?LOG_INFO(
"Logging to exchange '~ts' in vhost '~ts' ready", [Name, VHost],
"Logging to ~ts ready", [rabbit_misc:rs(Exchange)],
#{domain => ?RMQLOG_DOMAIN_GLOBAL});
error ->
?LOG_DEBUG(
"Logging to exchange '~ts' in vhost '~ts' not ready, "
"trying again in ~b second(s)",
[Name, VHost, ?DECL_EXCHANGE_INTERVAL_SECS],
"Logging to ~ts not ready, trying again in ~b second(s)",
[rabbit_misc:rs(Exchange), ?DECL_EXCHANGE_INTERVAL_SECS],
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
receive
stop -> ok
Expand All @@ -168,36 +166,45 @@ setup_proc(
end
end.

declare_exchange(
#{config := #{exchange := #resource{name = Name,
virtual_host = VHost} = Exchange}}) ->
try
%% Durable.
#exchange{} = rabbit_exchange:declare(
Exchange, topic, true, false, true, [],
?INTERNAL_USER),
?LOG_DEBUG(
"Declared exchange '~ts' in vhost '~ts'",
[Name, VHost],
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
ok
declare_exchange(#{config := #{exchange := Exchange}}) ->
try rabbit_exchange:declare(
Exchange, topic, true, false, true, [], ?INTERNAL_USER) of
{ok, #exchange{}} ->
?LOG_DEBUG(
"Declared ~ts",
[rabbit_misc:rs(Exchange)],
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
ok;
{error, timeout} ->
?LOG_DEBUG(
"Could not declare ~ts because the operation timed out",
[rabbit_misc:rs(Exchange)],
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
error
catch
Class:Reason ->
?LOG_DEBUG(
"Could not declare exchange '~ts' in vhost '~ts', "
"reason: ~0p:~0p",
[Name, VHost, Class, Reason],
"Could not declare ~ts, reason: ~0p:~0p",
[rabbit_misc:rs(Exchange), Class, Reason],
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
error
end.

unconfigure_exchange(
#{config := #{exchange := #resource{name = Name,
virtual_host = VHost} = Exchange,
#{config := #{exchange := Exchange,
setup_proc := Pid}}) ->
Pid ! stop,
_ = rabbit_exchange:delete(Exchange, false, ?INTERNAL_USER),
case rabbit_exchange:ensure_deleted(Exchange, false, ?INTERNAL_USER) of
ok ->
ok;
{error, timeout} ->
?LOG_ERROR(
"Could not delete ~ts due to a timeout",
[rabbit_misc:rs(Exchange)],
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
ok
end,
?LOG_INFO(
"Logging to exchange '~ts' in vhost '~ts' disabled",
[Name, VHost],
"Logging to ~ts disabled",
[rabbit_misc:rs(Exchange)],
#{domain => ?RMQLOG_DOMAIN_GLOBAL}).
Loading

0 comments on commit 4207faf

Please sign in to comment.