Skip to content

Commit

Permalink
rabbit_db_*: Handle khepri_adv:node_props_map() returns from adv API
Browse files Browse the repository at this point in the history
Khepri 0.17.x will change the return of functions from the khepri_adv
and khepri_tx_adv modules. Previously, functions that target one
specific tree node, for example `khepri_adv:delete/3`, would return
the node props map (`khepri:node_props()`) for the affected node. Now
all of the "adv API" returns `khepri_adv:node_props_map()` for
consistency.
  • Loading branch information
the-mikedavis committed Nov 22, 2024
1 parent c9b3d72 commit 6d289fe
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 41 deletions.
18 changes: 13 additions & 5 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -892,14 +892,22 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) ->
delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) ->
Pattern = khepri_route_path(
VHost,
_SrcName = ?KHEPRI_WILDCARD_STAR,
?KHEPRI_WILDCARD_STAR, %% SrcName
Kind,
Name,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
?KHEPRI_WILDCARD_STAR), %% RoutingKey
{ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern),
Bindings = maps:fold(fun(_, #{data := Set}, Acc) ->
sets:to_list(Set) ++ Acc
end, [], BindingsMap),
Bindings = maps:fold(
fun(Path, Props, Acc) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_ROUTE_PATH(
VHost, _SrcName, Kind, Name, _RoutingKey),
#{data := Set}} ->
sets:to_list(Set) ++ Acc;
{_, _} ->
Acc
end
end, [], BindingsMap),
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
lists:keysort(#binding.source, Bindings), OnlyDurable).

Expand Down
29 changes: 26 additions & 3 deletions deps/rabbit/src/rabbit_db_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,19 @@ update_in_khepri(XName, Fun) ->
Path = khepri_exchange_path(XName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := X, payload_version := Vsn}} ->
{ok, QueryRet} ->
{X, Vsn} = case QueryRet of
%% Khepri 0.16 and below returned
%% `khepri:node_props()' for adv queries and
%% commands targeting one node:
#{data := Data, payload_version := V} ->
{Data, V};
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
%% instead.
#{Path := #{data := Data,
payload_version := V}} ->
{Data, V}
end,
X1 = Fun(X),
UpdatePath =
khepri_path:combine_with_conditions(
Expand Down Expand Up @@ -534,8 +546,19 @@ next_serial_in_khepri(XName) ->
Path = khepri_exchange_serial_path(XName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := Serial,
payload_version := Vsn}} ->
{ok, QueryRet} ->
{Serial, Vsn} = case QueryRet of
%% Khepri 0.16 and below returned
%% `khepri:node_props()' for adv queries and
%% commands targeting one node:
#{data := Data, payload_version := V} ->
{Data, V};
%% Khepri 0.17+ return
%% `khepri_adv:node_props_map()` instead.
#{Path := #{data := Data,
payload_version := V}} ->
{Data, V}
end,
UpdatePath =
khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
Expand Down
16 changes: 14 additions & 2 deletions deps/rabbit/src/rabbit_db_msup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,20 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) ->
mirroring_pid = Overall,
childspec = ChildSpec},
case rabbit_khepri:adv_get(Path) of
{ok, #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
payload_version := Vsn}} ->
{ok, QueryRet} ->
{#mirrored_sup_childspec{mirroring_pid = Pid}, Vsn} =
case QueryRet of
%% Khepri 0.16 and below returned
%% `khepri:node_props()' for adv queries and
%% commands targeting one node:
#{data := Data, payload_version := V} ->
{Data, V};
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
%% instead.
#{Path := #{data := Data,
payload_version := V}} ->
{Data, V}
end,
case Overall of
Pid ->
Delegate;
Expand Down
80 changes: 62 additions & 18 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,16 @@ delete_in_khepri(QueueName, OnlyDurable) ->
fun () ->
Path = khepri_queue_path(QueueName),
case khepri_tx_adv:delete(Path) of
%% Khepri 0.16 and below returned `khepri:node_props()' for
%% adv queries and commands targeting one node:
{ok, #{data := _}} ->
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
%% instead.
{ok, #{Path := #{data := _}}} ->
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
{ok, _} ->
ok
end
Expand Down Expand Up @@ -606,7 +612,19 @@ update_in_khepri(QName, Fun) ->
Path = khepri_queue_path(QName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := Q, payload_version := Vsn}} ->
{ok, QueryRet} ->
{Q, Vsn} = case QueryRet of
%% Khepri 0.16 and below returned
%% `khepri:node_props()' for adv queries and
%% commands targeting one node:
#{data := Data, payload_version := V} ->
{Data, V};
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
%% instead.
#{Path := #{data := Data,
payload_version := V}} ->
{Data, V}
end,
UpdatePath = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
Q1 = Fun(Q),
Expand Down Expand Up @@ -657,11 +675,23 @@ update_decorators_in_khepri(QName, Decorators) ->
Path = khepri_queue_path(QName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := Q1, payload_version := Vsn}} ->
Q2 = amqqueue:set_decorators(Q1, Decorators),
{ok, QueryRet} ->
{Q, Vsn} = case QueryRet of
%% Khepri 0.16 and below returned
%% `khepri:node_props()' for adv queries and
%% commands targeting one node:
#{data := Data, payload_version := V} ->
{Data, V};
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
%% instead.
#{Path := #{data := Data,
payload_version := V}} ->
{Data, V}
end,
Q1 = amqqueue:set_decorators(Q, Decorators),
UpdatePath = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
Ret2 = rabbit_khepri:put(UpdatePath, Q2),
Ret2 = rabbit_khepri:put(UpdatePath, Q1),
case Ret2 of
ok -> ok;
{error, {khepri, mismatching_node, _}} ->
Expand Down Expand Up @@ -1102,20 +1132,7 @@ do_delete_transient_queues_in_khepri([], _FilterFun) ->
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
Res = rabbit_khepri:transaction(
fun() ->
rabbit_misc:fold_while_ok(
fun({Path, QName}, Acc) ->
%% Also see `delete_in_khepri/2'.
case khepri_tx_adv:delete(Path) of
{ok, #{data := _}} ->
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
QName, false),
{ok, [{QName, Deletions} | Acc]};
{ok, _} ->
{ok, Acc};
{error, _} = Error ->
Error
end
end, [], Qs)
do_delete_transient_queues_in_khepri_tx(Qs, [])
end),
case Res of
{ok, Items} ->
Expand All @@ -1129,6 +1146,33 @@ do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
Error
end.

do_delete_transient_queues_in_khepri_tx([], Acc) ->
{ok, Acc};
do_delete_transient_queues_in_khepri_tx([{Path, QName} | Rest], Acc) ->
%% Also see `delete_in_khepri/2'.
case khepri_tx_adv:delete(Path) of
{ok, Res} ->
Acc1 = case Res of
%% Khepri 0.16 and below returned `khepri:node_props()'
%% for adv queries and commands targeting one node:
#{data := _} ->
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
QName, false),
[{QName, Deletions} | Acc];
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
%% instead.
#{Path := #{data := _}} ->
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
QName, false),
[{QName, Deletions} | Acc];
_ ->
Acc
end,
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
{error, _} = Error ->
Error
end.

%% -------------------------------------------------------------------
%% foreach_transient().
%% -------------------------------------------------------------------
Expand Down
10 changes: 10 additions & 0 deletions deps/rabbit/src/rabbit_db_rtparams.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,13 @@ set_in_khepri(Key, Term) ->
Record = #runtime_parameters{key = Key,
value = Term},
case rabbit_khepri:adv_put(Path, Record) of
%% Khepri 0.16 and below returned `khepri:node_props()' for adv queries
%% and commands targeting one node:
{ok, #{data := Params}} ->
{old, Params#runtime_parameters.value};
%% Khepri 0.17+ return `khepri_adv:node_props_map()` instead.
{ok, #{Path := #{data := Params}}} ->
{old, Params#runtime_parameters.value};
{ok, _} ->
new
end.
Expand Down Expand Up @@ -114,8 +119,13 @@ set_in_khepri_tx(Key, Term) ->
Record = #runtime_parameters{key = Key,
value = Term},
case khepri_tx_adv:put(Path, Record) of
%% Khepri 0.16 and below returned `khepri:node_props()' for adv
%% queries and commands targeting one node:
{ok, #{data := Params}} ->
{old, Params#runtime_parameters.value};
%% Khepri 0.17+ return `khepri_adv:node_props_map()` instead.
{ok, #{Path := #{data := Params}}} ->
{old, Params#runtime_parameters.value};
{ok, _} ->
new
end.
Expand Down
38 changes: 31 additions & 7 deletions deps/rabbit/src/rabbit_db_vhost.erl
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,23 @@ merge_metadata_in_khepri(VHostName, Metadata) ->
Path = khepri_vhost_path(VHostName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := VHost0, payload_version := DVersion}} ->
{ok, QueryRet} ->
{VHost0, Vsn} = case QueryRet of
%% Khepri 0.16 and below returned
%% `khepri:node_props()' for adv queries and
%% commands targeting one node:
#{data := Data, payload_version := V} ->
{Data, V};
%% Khepri 0.17+ return
%% `khepri_adv:node_props_map()` instead.
#{Path := #{data := Data,
payload_version := V}} ->
{Data, V}
end,
VHost = vhost:merge_metadata(VHost0, Metadata),
rabbit_log:debug("Updating a virtual host record ~p", [VHost]),
Path1 = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = DVersion}]),
Path, [#if_payload_version{version = Vsn}]),
Ret2 = rabbit_khepri:put(Path1, VHost),
case Ret2 of
ok ->
Expand Down Expand Up @@ -411,13 +423,25 @@ update_in_mnesia_tx(VHostName, UpdateFun)
update_in_khepri(VHostName, UpdateFun) ->
Path = khepri_vhost_path(VHostName),
case rabbit_khepri:adv_get(Path) of
{ok, #{data := V, payload_version := DVersion}} ->
V1 = UpdateFun(V),
{ok, QueryRet} ->
{VHost0, Vsn} = case QueryRet of
%% Khepri 0.16 and below returned
%% `khepri:node_props()' for adv queries and
%% commands targeting one node:
#{data := Data, payload_version := V} ->
{Data, V};
%% Khepri 0.17+ return
%% `khepri_adv:node_props_map()` instead.
#{Path := #{data := Data,
payload_version := V}} ->
{Data, V}
end,
VHost1 = UpdateFun(VHost0),
Path1 = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = DVersion}]),
case rabbit_khepri:put(Path1, V1) of
Path, [#if_payload_version{version = Vsn}]),
case rabbit_khepri:put(Path1, VHost1) of
ok ->
V1;
VHost1;
{error, {khepri, mismatching_node, _}} ->
update_in_khepri(VHostName, UpdateFun);
Error ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,25 @@ create_binding_in_mnesia_tx(Src, Dst, Weight, UpdateFun) ->
create_binding_in_khepri(Src, Dst, Weight, UpdateFun) ->
Path = khepri_consistent_hash_path(Src),
case rabbit_khepri:adv_get(Path) of
{ok, #{data := Chx0, payload_version := DVersion}} ->
{ok, QueryRet} ->
{Chx0, Vsn} = case QueryRet of
%% Khepri 0.16 and below returned
%% `khepri:node_props()' for adv queries and
%% commands targeting one node:
#{data := Data, payload_version := V} ->
{Data, V};
%% Khepri 0.17+ return
%% `khepri_adv:node_props_map()` instead.
#{Path := #{data := Data,
payload_version := V}} ->
{Data, V}
end,
case UpdateFun(Chx0, Dst, Weight) of
already_exists ->
already_exists;
Chx ->
Path1 = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = DVersion}]),
Path, [#if_payload_version{version = Vsn}]),
Ret2 = rabbit_khepri:put(Path1, Chx),
case Ret2 of
ok ->
Expand Down
16 changes: 14 additions & 2 deletions deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,21 @@ create_or_update_in_mnesia(XName, BindingKeyAndFun, ErrorFun) ->
update_in_khepri(XName, BindingKeyAndFun, UpdateFun, ErrorFun) ->
Path = khepri_jms_topic_exchange_path(XName),
case rabbit_khepri:adv_get(Path) of
{ok, #{data := BindingFuns, payload_version := DVersion}} ->
{ok, QueryRet} ->
{BindingFuns, Vsn} = case QueryRet of
%% Khepri 0.16 and below returned
%% `khepri:node_props()' for adv queries
%% and commands targeting one node:
#{data := Data, payload_version := V} ->
{Data, V};
%% Khepri 0.17+ return
%% `khepri_adv:node_props_map()` instead.
#{Path := #{data := Data,
payload_version := V}} ->
{Data, V}
end,
Path1 = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = DVersion}]),
Path, [#if_payload_version{version = Vsn}]),
Ret = rabbit_khepri:put(Path1, UpdateFun(BindingFuns, BindingKeyAndFun)),
case Ret of
ok -> ok;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,22 @@ insert0_in_mnesia(Key, Cached, Message, Length) ->
insert_in_khepri(XName, Message, Length) ->
Path = khepri_recent_history_path(XName),
case rabbit_khepri:adv_get(Path) of
{ok, #{data := Cached0, payload_version := DVersion}} ->
{ok, QueryRet} ->
{Cached0, Vsn} = case QueryRet of
%% Khepri 0.16 and below returned
%% `khepri:node_props()' for adv queries and
%% commands targeting one node:
#{data := Data, payload_version := V} ->
{Data, V};
%% Khepri 0.17+ return
%% `khepri_adv:node_props_map()` instead.
#{Path := #{data := Data,
payload_version := V}} ->
{Data, V}
end,
Cached = add_to_cache(Cached0, Message, Length),
Path1 = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = DVersion}]),
Path, [#if_payload_version{version = Vsn}]),
Ret = rabbit_khepri:put(Path1, Cached),
case Ret of
ok ->
Expand Down

0 comments on commit 6d289fe

Please sign in to comment.