Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump Khepri to 0.17.0 #12753

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,11 @@ erlang_package.hex_package(
version = "1.4.1",
)

erlang_package.hex_package(
erlang_package.git_package(
name = "khepri",
build_file = "@rabbitmq-server//bazel:BUILD.khepri",
sha256 = "feee8a0a1f3f78dd9f8860feacba63cc165c81af1b351600903e34a20676d5f6",
version = "0.16.0",
repository = "rabbitmq/khepri",
commit = "3aacfb5e873871e1c6bca6a1f71d539bd668e119",
)

erlang_package.hex_package(
Expand Down Expand Up @@ -253,8 +253,8 @@ erlang_package.hex_package(
name = "ra",
build_file = "@rabbitmq-server//bazel:BUILD.ra",
pkg = "ra",
sha256 = "1d553dd971a0b398b7af0fa8c8458dda575715ff71c65c972e9500b24039b240",
version = "2.14.0",
sha256 = "cfc0dbe5ebbd54f44081f95ea6a1daeb28a89df82aa9baa234f68abbb36bdc67",
version = "2.15.0",
)

erlang_package.git_package(
Expand Down
3 changes: 2 additions & 1 deletion bazel/BUILD.khepri
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ filegroup(
"src/khepri_path.erl",
"src/khepri_pattern_tree.erl",
"src/khepri_payload.erl",
"src/khepri_prefix_tree.erl",
"src/khepri_projection.erl",
"src/khepri_sproc.erl",
"src/khepri_sup.erl",
Expand All @@ -74,10 +75,10 @@ filegroup(
"src/khepri_error.hrl",
"src/khepri_evf.hrl",
"src/khepri_machine.hrl",
"src/khepri_node.hrl",
"src/khepri_payload.hrl",
"src/khepri_projection.hrl",
"src/khepri_ret.hrl",
"src/khepri_tree.hrl",
"src/khepri_tx.hrl",
],
)
Expand Down
48 changes: 32 additions & 16 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -837,17 +837,25 @@ delete_all_for_exchange_in_khepri(X = #exchange{name = XName}, OnlyDurable, Remo
end,
{deleted, X, Bindings, delete_for_destination_in_khepri(XName, OnlyDurable)}.

delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
Path = khepri_route_path(
VHost,
Name,
_Kind = ?KHEPRI_WILDCARD_STAR,
_DstName = ?KHEPRI_WILDCARD_STAR,
_RoutingKey = #if_has_data{}),
{ok, Bindings} = khepri_tx_adv:delete_many(Path),
maps:fold(fun(_P, #{data := Set}, Acc) ->
sets:to_list(Set) ++ Acc
end, [], Bindings).
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = SrcName}) ->
Pattern = khepri_route_path(
VHost,
SrcName,
?KHEPRI_WILDCARD_STAR, %% Kind
?KHEPRI_WILDCARD_STAR, %% DstName
#if_has_data{}), %% RoutingKey
{ok, Bindings} = khepri_tx_adv:delete_many(Pattern),
maps:fold(
fun(Path, Props, Acc) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_ROUTE_PATH(
VHost, SrcName, _Kind, _Name, _RoutingKey),
#{data := Set}} ->
sets:to_list(Set) ++ Acc;
{_, _} ->
Acc
end
end, [], Bindings).

%% -------------------------------------------------------------------
%% delete_for_destination_in_mnesia().
Expand Down Expand Up @@ -892,14 +900,22 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) ->
delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) ->
Pattern = khepri_route_path(
VHost,
_SrcName = ?KHEPRI_WILDCARD_STAR,
?KHEPRI_WILDCARD_STAR, %% SrcName
Kind,
Name,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
?KHEPRI_WILDCARD_STAR), %% RoutingKey
{ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern),
Bindings = maps:fold(fun(_, #{data := Set}, Acc) ->
sets:to_list(Set) ++ Acc
end, [], BindingsMap),
Bindings = maps:fold(
fun(Path, Props, Acc) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_ROUTE_PATH(
VHost, _SrcName, Kind, Name, _RoutingKey),
#{data := Set}} ->
sets:to_list(Set) ++ Acc;
{_, _} ->
Acc
end
end, [], BindingsMap),
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
lists:keysort(#binding.source, Bindings), OnlyDurable).

Expand Down
50 changes: 40 additions & 10 deletions deps/rabbit/src/rabbit_db_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,19 @@ update_in_khepri(XName, Fun) ->
Path = khepri_exchange_path(XName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := X, payload_version := Vsn}} ->
{ok, QueryRet} ->
{X, Vsn} = case QueryRet of
%% Khepri 0.16 and below returned
%% `khepri:node_props()' for adv queries and
%% commands targeting one node:
#{data := Data, payload_version := V} ->
{Data, V};
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
%% instead.
#{Path := #{data := Data,
payload_version := V}} ->
{Data, V}
end,
X1 = Fun(X),
UpdatePath =
khepri_path:combine_with_conditions(
Expand Down Expand Up @@ -534,8 +546,19 @@ next_serial_in_khepri(XName) ->
Path = khepri_exchange_serial_path(XName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := Serial,
payload_version := Vsn}} ->
{ok, QueryRet} ->
{Serial, Vsn} = case QueryRet of
%% Khepri 0.16 and below returned
%% `khepri:node_props()' for adv queries and
%% commands targeting one node:
#{data := Data, payload_version := V} ->
{Data, V};
%% Khepri 0.17+ return
%% `khepri_adv:node_props_map()` instead.
#{Path := #{data := Data,
payload_version := V}} ->
{Data, V}
end,
UpdatePath =
khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
Expand Down Expand Up @@ -711,13 +734,20 @@ delete_all_in_khepri_tx(VHostName) ->
{ok, NodeProps} = khepri_tx_adv:delete_many(Pattern),
Deletions =
maps:fold(
fun(_Path, #{data := X}, Deletions) ->
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
rabbit_db_binding:delete_all_for_exchange_in_khepri(
X, false, true),
Deletions1 = rabbit_binding:add_deletion(
XName, X, deleted, Bindings, XDeletions),
rabbit_binding:combine_deletions(Deletions, Deletions1)
fun(Path, Props, Deletions) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_EXCHANGE_PATH(VHostName, _),
#{data := X}} ->
{deleted,
#exchange{name = XName}, Bindings, XDeletions} =
rabbit_db_binding:delete_all_for_exchange_in_khepri(
X, false, true),
Deletions1 = rabbit_binding:add_deletion(
XName, X, deleted, Bindings, XDeletions),
rabbit_binding:combine_deletions(Deletions, Deletions1);
{_, _} ->
Deletions
end
end, rabbit_binding:new_deletions(), NodeProps),
{ok, Deletions}.

Expand Down
16 changes: 14 additions & 2 deletions deps/rabbit/src/rabbit_db_msup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,20 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) ->
mirroring_pid = Overall,
childspec = ChildSpec},
case rabbit_khepri:adv_get(Path) of
{ok, #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
payload_version := Vsn}} ->
{ok, QueryRet} ->
{#mirrored_sup_childspec{mirroring_pid = Pid}, Vsn} =
case QueryRet of
%% Khepri 0.16 and below returned
%% `khepri:node_props()' for adv queries and
%% commands targeting one node:
#{data := Data, payload_version := V} ->
{Data, V};
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
%% instead.
#{Path := #{data := Data,
payload_version := V}} ->
{Data, V}
end,
case Overall of
Pid ->
Delegate;
Expand Down
80 changes: 62 additions & 18 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,16 @@ delete_in_khepri(QueueName, OnlyDurable) ->
fun () ->
Path = khepri_queue_path(QueueName),
case khepri_tx_adv:delete(Path) of
%% Khepri 0.16 and below returned `khepri:node_props()' for
%% adv queries and commands targeting one node:
{ok, #{data := _}} ->
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
%% instead.
{ok, #{Path := #{data := _}}} ->
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
{ok, _} ->
ok
end
Expand Down Expand Up @@ -606,7 +612,19 @@ update_in_khepri(QName, Fun) ->
Path = khepri_queue_path(QName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := Q, payload_version := Vsn}} ->
{ok, QueryRet} ->
{Q, Vsn} = case QueryRet of
%% Khepri 0.16 and below returned
%% `khepri:node_props()' for adv queries and
%% commands targeting one node:
#{data := Data, payload_version := V} ->
{Data, V};
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
%% instead.
#{Path := #{data := Data,
payload_version := V}} ->
{Data, V}
end,
UpdatePath = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
Q1 = Fun(Q),
Expand Down Expand Up @@ -657,11 +675,23 @@ update_decorators_in_khepri(QName, Decorators) ->
Path = khepri_queue_path(QName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := Q1, payload_version := Vsn}} ->
Q2 = amqqueue:set_decorators(Q1, Decorators),
{ok, QueryRet} ->
{Q, Vsn} = case QueryRet of
%% Khepri 0.16 and below returned
%% `khepri:node_props()' for adv queries and
%% commands targeting one node:
#{data := Data, payload_version := V} ->
{Data, V};
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
%% instead.
#{Path := #{data := Data,
payload_version := V}} ->
{Data, V}
end,
Q1 = amqqueue:set_decorators(Q, Decorators),
UpdatePath = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
Ret2 = rabbit_khepri:put(UpdatePath, Q2),
Ret2 = rabbit_khepri:put(UpdatePath, Q1),
case Ret2 of
ok -> ok;
{error, {khepri, mismatching_node, _}} ->
Expand Down Expand Up @@ -1102,20 +1132,7 @@ do_delete_transient_queues_in_khepri([], _FilterFun) ->
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
Res = rabbit_khepri:transaction(
fun() ->
rabbit_misc:fold_while_ok(
fun({Path, QName}, Acc) ->
%% Also see `delete_in_khepri/2'.
case khepri_tx_adv:delete(Path) of
{ok, #{data := _}} ->
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
QName, false),
{ok, [{QName, Deletions} | Acc]};
{ok, _} ->
{ok, Acc};
{error, _} = Error ->
Error
end
end, [], Qs)
do_delete_transient_queues_in_khepri_tx(Qs, [])
end),
case Res of
{ok, Items} ->
Expand All @@ -1129,6 +1146,33 @@ do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
Error
end.

do_delete_transient_queues_in_khepri_tx([], Acc) ->
{ok, Acc};
do_delete_transient_queues_in_khepri_tx([{Path, QName} | Rest], Acc) ->
%% Also see `delete_in_khepri/2'.
case khepri_tx_adv:delete(Path) of
{ok, Res} ->
Acc1 = case Res of
%% Khepri 0.16 and below returned `khepri:node_props()'
%% for adv queries and commands targeting one node:
#{data := _} ->
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
QName, false),
[{QName, Deletions} | Acc];
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
%% instead.
#{Path := #{data := _}} ->
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
QName, false),
[{QName, Deletions} | Acc];
_ ->
Acc
end,
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
{error, _} = Error ->
Error
end.

%% -------------------------------------------------------------------
%% foreach_transient().
%% -------------------------------------------------------------------
Expand Down
32 changes: 27 additions & 5 deletions deps/rabbit/src/rabbit_db_rtparams.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,13 @@ set_in_khepri(Key, Term) ->
Record = #runtime_parameters{key = Key,
value = Term},
case rabbit_khepri:adv_put(Path, Record) of
%% Khepri 0.16 and below returned `khepri:node_props()' for adv queries
%% and commands targeting one node:
{ok, #{data := Params}} ->
{old, Params#runtime_parameters.value};
%% Khepri 0.17+ return `khepri_adv:node_props_map()` instead.
{ok, #{Path := #{data := Params}}} ->
{old, Params#runtime_parameters.value};
{ok, _} ->
new
end.
Expand Down Expand Up @@ -114,8 +119,13 @@ set_in_khepri_tx(Key, Term) ->
Record = #runtime_parameters{key = Key,
value = Term},
case khepri_tx_adv:put(Path, Record) of
%% Khepri 0.16 and below returned `khepri:node_props()' for adv
%% queries and commands targeting one node:
{ok, #{data := Params}} ->
{old, Params#runtime_parameters.value};
%% Khepri 0.17+ return `khepri_adv:node_props_map()` instead.
{ok, #{Path := #{data := Params}}} ->
{old, Params#runtime_parameters.value};
{ok, _} ->
new
end.
Expand Down Expand Up @@ -347,11 +357,23 @@ delete_vhost_in_mnesia_tx(VHostName) ->
<- mnesia:match_object(?MNESIA_TABLE, Match, read)].

delete_vhost_in_khepri(VHostName) ->
Path = khepri_vhost_rp_path(
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
case rabbit_khepri:adv_delete_many(Path) of
{ok, Props} ->
{ok, rabbit_khepri:collect_payloads(Props)};
Pattern = khepri_vhost_rp_path(
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
case rabbit_khepri:adv_delete_many(Pattern) of
{ok, NodePropsMap} ->
RTParams =
maps:fold(
fun(Path, Props, Acc) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_VHOST_RUNTIME_PARAM_PATH(
VHostName, _, _),
#{data := RTParam}} ->
[RTParam | Acc];
{_, _} ->
Acc
end
end, [], NodePropsMap),
{ok, RTParams};
{error, _} = Err ->
Err
end.
Expand Down
Loading
Loading