From ed549ed55b545373aecc070ec116f0a58d661f68 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 14 Nov 2024 13:09:00 -0500 Subject: [PATCH 1/4] Update Ra dependency to 2.15.0 Khepri v0.17 needs a change in Ra 2.15.0 that exposes more data in the `ra_aux` API. --- MODULE.bazel | 4 ++-- rabbitmq-components.mk | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/MODULE.bazel b/MODULE.bazel index 5df2a0233fc..c3797c602f0 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -253,8 +253,8 @@ erlang_package.hex_package( name = "ra", build_file = "@rabbitmq-server//bazel:BUILD.ra", pkg = "ra", - sha256 = "1d553dd971a0b398b7af0fa8c8458dda575715ff71c65c972e9500b24039b240", - version = "2.14.0", + sha256 = "cfc0dbe5ebbd54f44081f95ea6a1daeb28a89df82aa9baa234f68abbb36bdc67", + version = "2.15.0", ) erlang_package.git_package( diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 594edf66a4d..b9ad72bb428 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -50,7 +50,7 @@ dep_khepri = hex 0.16.0 dep_khepri_mnesia_migration = hex 0.7.1 dep_osiris = git https://github.com/rabbitmq/osiris v1.8.5 dep_prometheus = hex 4.11.0 -dep_ra = hex 2.14.0 +dep_ra = hex 2.15.0 dep_ranch = hex 2.1.0 dep_recon = hex 2.5.6 dep_redbug = hex 2.0.7 From 2ad9832b085c7693914ea980b81181dd4ac570c6 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 14 Nov 2024 13:21:00 -0500 Subject: [PATCH 2/4] WIP: Update Khepri dependency to 0.17.0 --- MODULE.bazel | 6 +++--- bazel/BUILD.khepri | 3 ++- rabbitmq-components.mk | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/MODULE.bazel b/MODULE.bazel index c3797c602f0..ab47d9461fe 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -207,11 +207,11 @@ erlang_package.hex_package( version = "1.4.1", ) -erlang_package.hex_package( +erlang_package.git_package( name = "khepri", build_file = "@rabbitmq-server//bazel:BUILD.khepri", - sha256 = "feee8a0a1f3f78dd9f8860feacba63cc165c81af1b351600903e34a20676d5f6", - version = "0.16.0", + repository = "rabbitmq/khepri", + commit = "3aacfb5e873871e1c6bca6a1f71d539bd668e119", ) erlang_package.hex_package( diff --git a/bazel/BUILD.khepri b/bazel/BUILD.khepri index 1e4c6a294d8..bb0e09002bd 100644 --- a/bazel/BUILD.khepri +++ b/bazel/BUILD.khepri @@ -56,6 +56,7 @@ filegroup( "src/khepri_path.erl", "src/khepri_pattern_tree.erl", "src/khepri_payload.erl", + "src/khepri_prefix_tree.erl", "src/khepri_projection.erl", "src/khepri_sproc.erl", "src/khepri_sup.erl", @@ -74,10 +75,10 @@ filegroup( "src/khepri_error.hrl", "src/khepri_evf.hrl", "src/khepri_machine.hrl", + "src/khepri_node.hrl", "src/khepri_payload.hrl", "src/khepri_projection.hrl", "src/khepri_ret.hrl", - "src/khepri_tree.hrl", "src/khepri_tx.hrl", ], ) diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index b9ad72bb428..0bdf8d3ed8e 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -46,7 +46,7 @@ dep_credentials_obfuscation = hex 3.4.0 dep_cuttlefish = hex 3.4.0 dep_gen_batch_server = hex 0.8.8 dep_jose = hex 1.11.10 -dep_khepri = hex 0.16.0 +dep_khepri = git https://github.com/rabbitmq/khepri 3aacfb5e873871e1c6bca6a1f71d539bd668e119 dep_khepri_mnesia_migration = hex 0.7.1 dep_osiris = git https://github.com/rabbitmq/osiris v1.8.5 dep_prometheus = hex 4.11.0 From 692e96e5e711e6e4382af7e3ef8b129fc0bf6cfb Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 14 Nov 2024 15:29:59 -0500 Subject: [PATCH 3/4] rabbit_khepri: Use rabbit_db_cluster's members/2 and nodes/2 `locally_known_members/1` and `locally_known_node/1` were replaced with `members/2` and `nodes/2` with `favor` set to `low_latency` - this matches the interface for queries in Khepri. --- deps/rabbit/src/rabbit_khepri.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 7bb586525f9..847e682a88f 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -653,7 +653,7 @@ members() -> %% The returned list is empty if there was an error. locally_known_members() -> - case khepri_cluster:locally_known_members(?RA_CLUSTER_NAME) of + case khepri_cluster:members(?RA_CLUSTER_NAME, #{favor => low_latency}) of {ok, Members} -> Members; {error, _Reason} -> [] end. @@ -683,7 +683,7 @@ nodes() -> %% The returned list is empty if there was an error. locally_known_nodes() -> - case khepri_cluster:locally_known_nodes(?RA_CLUSTER_NAME) of + case khepri_cluster:nodes(?RA_CLUSTER_NAME, #{favor => low_latency}) of {ok, Nodes} -> Nodes; {error, _Reason} -> [] end. From cc6185d4e122eae1dba3ed15c6edd00119aadb88 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 14 Nov 2024 13:35:28 -0500 Subject: [PATCH 4/4] rabbit_db_*: Handle `khepri_adv:node_props_map()` returns from adv API Khepri 0.17.x will change the return of functions from the khepri_adv and khepri_tx_adv modules. Previously, functions that target one specific tree node, for example `khepri_adv:delete/3`, would return the node props map (`khepri:node_props()`) for the affected node. Now all of the "adv API" returns `khepri_adv:node_props_map()` for consistency. --- deps/rabbit/src/rabbit_db_binding.erl | 48 +++++++---- deps/rabbit/src/rabbit_db_exchange.erl | 50 +++++++++--- deps/rabbit/src/rabbit_db_msup.erl | 16 +++- deps/rabbit/src/rabbit_db_queue.erl | 80 ++++++++++++++----- deps/rabbit/src/rabbit_db_rtparams.erl | 32 ++++++-- deps/rabbit/src/rabbit_db_user.erl | 46 ++++++++--- deps/rabbit/src/rabbit_db_vhost.erl | 38 +++++++-- deps/rabbit/src/rabbit_khepri.erl | 46 ----------- .../src/rabbit_db_ch_exchange.erl | 16 +++- .../src/rabbit_db_jms_exchange.erl | 16 +++- .../src/rabbit_db_rh_exchange.erl | 16 +++- 11 files changed, 282 insertions(+), 122 deletions(-) diff --git a/deps/rabbit/src/rabbit_db_binding.erl b/deps/rabbit/src/rabbit_db_binding.erl index ccae244f1c4..f6248843da7 100644 --- a/deps/rabbit/src/rabbit_db_binding.erl +++ b/deps/rabbit/src/rabbit_db_binding.erl @@ -837,17 +837,25 @@ delete_all_for_exchange_in_khepri(X = #exchange{name = XName}, OnlyDurable, Remo end, {deleted, X, Bindings, delete_for_destination_in_khepri(XName, OnlyDurable)}. -delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) -> - Path = khepri_route_path( - VHost, - Name, - _Kind = ?KHEPRI_WILDCARD_STAR, - _DstName = ?KHEPRI_WILDCARD_STAR, - _RoutingKey = #if_has_data{}), - {ok, Bindings} = khepri_tx_adv:delete_many(Path), - maps:fold(fun(_P, #{data := Set}, Acc) -> - sets:to_list(Set) ++ Acc - end, [], Bindings). +delete_for_source_in_khepri(#resource{virtual_host = VHost, name = SrcName}) -> + Pattern = khepri_route_path( + VHost, + SrcName, + ?KHEPRI_WILDCARD_STAR, %% Kind + ?KHEPRI_WILDCARD_STAR, %% DstName + #if_has_data{}), %% RoutingKey + {ok, Bindings} = khepri_tx_adv:delete_many(Pattern), + maps:fold( + fun(Path, Props, Acc) -> + case {Path, Props} of + {?RABBITMQ_KHEPRI_ROUTE_PATH( + VHost, SrcName, _Kind, _Name, _RoutingKey), + #{data := Set}} -> + sets:to_list(Set) ++ Acc; + {_, _} -> + Acc + end + end, [], Bindings). %% ------------------------------------------------------------------- %% delete_for_destination_in_mnesia(). @@ -892,14 +900,22 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) -> delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) -> Pattern = khepri_route_path( VHost, - _SrcName = ?KHEPRI_WILDCARD_STAR, + ?KHEPRI_WILDCARD_STAR, %% SrcName Kind, Name, - _RoutingKey = ?KHEPRI_WILDCARD_STAR), + ?KHEPRI_WILDCARD_STAR), %% RoutingKey {ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern), - Bindings = maps:fold(fun(_, #{data := Set}, Acc) -> - sets:to_list(Set) ++ Acc - end, [], BindingsMap), + Bindings = maps:fold( + fun(Path, Props, Acc) -> + case {Path, Props} of + {?RABBITMQ_KHEPRI_ROUTE_PATH( + VHost, _SrcName, Kind, Name, _RoutingKey), + #{data := Set}} -> + sets:to_list(Set) ++ Acc; + {_, _} -> + Acc + end + end, [], BindingsMap), rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4, lists:keysort(#binding.source, Bindings), OnlyDurable). diff --git a/deps/rabbit/src/rabbit_db_exchange.erl b/deps/rabbit/src/rabbit_db_exchange.erl index 40a5aee9f9f..467bd8f5472 100644 --- a/deps/rabbit/src/rabbit_db_exchange.erl +++ b/deps/rabbit/src/rabbit_db_exchange.erl @@ -331,7 +331,19 @@ update_in_khepri(XName, Fun) -> Path = khepri_exchange_path(XName), Ret1 = rabbit_khepri:adv_get(Path), case Ret1 of - {ok, #{data := X, payload_version := Vsn}} -> + {ok, QueryRet} -> + {X, Vsn} = case QueryRet of + %% Khepri 0.16 and below returned + %% `khepri:node_props()' for adv queries and + %% commands targeting one node: + #{data := Data, payload_version := V} -> + {Data, V}; + %% Khepri 0.17+ return `khepri_adv:node_props_map()` + %% instead. + #{Path := #{data := Data, + payload_version := V}} -> + {Data, V} + end, X1 = Fun(X), UpdatePath = khepri_path:combine_with_conditions( @@ -534,8 +546,19 @@ next_serial_in_khepri(XName) -> Path = khepri_exchange_serial_path(XName), Ret1 = rabbit_khepri:adv_get(Path), case Ret1 of - {ok, #{data := Serial, - payload_version := Vsn}} -> + {ok, QueryRet} -> + {Serial, Vsn} = case QueryRet of + %% Khepri 0.16 and below returned + %% `khepri:node_props()' for adv queries and + %% commands targeting one node: + #{data := Data, payload_version := V} -> + {Data, V}; + %% Khepri 0.17+ return + %% `khepri_adv:node_props_map()` instead. + #{Path := #{data := Data, + payload_version := V}} -> + {Data, V} + end, UpdatePath = khepri_path:combine_with_conditions( Path, [#if_payload_version{version = Vsn}]), @@ -711,13 +734,20 @@ delete_all_in_khepri_tx(VHostName) -> {ok, NodeProps} = khepri_tx_adv:delete_many(Pattern), Deletions = maps:fold( - fun(_Path, #{data := X}, Deletions) -> - {deleted, #exchange{name = XName}, Bindings, XDeletions} = - rabbit_db_binding:delete_all_for_exchange_in_khepri( - X, false, true), - Deletions1 = rabbit_binding:add_deletion( - XName, X, deleted, Bindings, XDeletions), - rabbit_binding:combine_deletions(Deletions, Deletions1) + fun(Path, Props, Deletions) -> + case {Path, Props} of + {?RABBITMQ_KHEPRI_EXCHANGE_PATH(VHostName, _), + #{data := X}} -> + {deleted, + #exchange{name = XName}, Bindings, XDeletions} = + rabbit_db_binding:delete_all_for_exchange_in_khepri( + X, false, true), + Deletions1 = rabbit_binding:add_deletion( + XName, X, deleted, Bindings, XDeletions), + rabbit_binding:combine_deletions(Deletions, Deletions1); + {_, _} -> + Deletions + end end, rabbit_binding:new_deletions(), NodeProps), {ok, Deletions}. diff --git a/deps/rabbit/src/rabbit_db_msup.erl b/deps/rabbit/src/rabbit_db_msup.erl index a4bbb68d7e4..275f1057982 100644 --- a/deps/rabbit/src/rabbit_db_msup.erl +++ b/deps/rabbit/src/rabbit_db_msup.erl @@ -135,8 +135,20 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) -> mirroring_pid = Overall, childspec = ChildSpec}, case rabbit_khepri:adv_get(Path) of - {ok, #{data := #mirrored_sup_childspec{mirroring_pid = Pid}, - payload_version := Vsn}} -> + {ok, QueryRet} -> + {#mirrored_sup_childspec{mirroring_pid = Pid}, Vsn} = + case QueryRet of + %% Khepri 0.16 and below returned + %% `khepri:node_props()' for adv queries and + %% commands targeting one node: + #{data := Data, payload_version := V} -> + {Data, V}; + %% Khepri 0.17+ return `khepri_adv:node_props_map()` + %% instead. + #{Path := #{data := Data, + payload_version := V}} -> + {Data, V} + end, case Overall of Pid -> Delegate; diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index 542410fab2f..d593e92d5ad 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -411,10 +411,16 @@ delete_in_khepri(QueueName, OnlyDurable) -> fun () -> Path = khepri_queue_path(QueueName), case khepri_tx_adv:delete(Path) of + %% Khepri 0.16 and below returned `khepri:node_props()' for + %% adv queries and commands targeting one node: {ok, #{data := _}} -> %% we want to execute some things, as decided by rabbit_exchange, %% after the transaction. rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable); + %% Khepri 0.17+ return `khepri_adv:node_props_map()` + %% instead. + {ok, #{Path := #{data := _}}} -> + rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable); {ok, _} -> ok end @@ -606,7 +612,19 @@ update_in_khepri(QName, Fun) -> Path = khepri_queue_path(QName), Ret1 = rabbit_khepri:adv_get(Path), case Ret1 of - {ok, #{data := Q, payload_version := Vsn}} -> + {ok, QueryRet} -> + {Q, Vsn} = case QueryRet of + %% Khepri 0.16 and below returned + %% `khepri:node_props()' for adv queries and + %% commands targeting one node: + #{data := Data, payload_version := V} -> + {Data, V}; + %% Khepri 0.17+ return `khepri_adv:node_props_map()` + %% instead. + #{Path := #{data := Data, + payload_version := V}} -> + {Data, V} + end, UpdatePath = khepri_path:combine_with_conditions( Path, [#if_payload_version{version = Vsn}]), Q1 = Fun(Q), @@ -657,11 +675,23 @@ update_decorators_in_khepri(QName, Decorators) -> Path = khepri_queue_path(QName), Ret1 = rabbit_khepri:adv_get(Path), case Ret1 of - {ok, #{data := Q1, payload_version := Vsn}} -> - Q2 = amqqueue:set_decorators(Q1, Decorators), + {ok, QueryRet} -> + {Q, Vsn} = case QueryRet of + %% Khepri 0.16 and below returned + %% `khepri:node_props()' for adv queries and + %% commands targeting one node: + #{data := Data, payload_version := V} -> + {Data, V}; + %% Khepri 0.17+ return `khepri_adv:node_props_map()` + %% instead. + #{Path := #{data := Data, + payload_version := V}} -> + {Data, V} + end, + Q1 = amqqueue:set_decorators(Q, Decorators), UpdatePath = khepri_path:combine_with_conditions( Path, [#if_payload_version{version = Vsn}]), - Ret2 = rabbit_khepri:put(UpdatePath, Q2), + Ret2 = rabbit_khepri:put(UpdatePath, Q1), case Ret2 of ok -> ok; {error, {khepri, mismatching_node, _}} -> @@ -1102,20 +1132,7 @@ do_delete_transient_queues_in_khepri([], _FilterFun) -> do_delete_transient_queues_in_khepri(Qs, FilterFun) -> Res = rabbit_khepri:transaction( fun() -> - rabbit_misc:fold_while_ok( - fun({Path, QName}, Acc) -> - %% Also see `delete_in_khepri/2'. - case khepri_tx_adv:delete(Path) of - {ok, #{data := _}} -> - Deletions = rabbit_db_binding:delete_for_destination_in_khepri( - QName, false), - {ok, [{QName, Deletions} | Acc]}; - {ok, _} -> - {ok, Acc}; - {error, _} = Error -> - Error - end - end, [], Qs) + do_delete_transient_queues_in_khepri_tx(Qs, []) end), case Res of {ok, Items} -> @@ -1129,6 +1146,33 @@ do_delete_transient_queues_in_khepri(Qs, FilterFun) -> Error end. +do_delete_transient_queues_in_khepri_tx([], Acc) -> + {ok, Acc}; +do_delete_transient_queues_in_khepri_tx([{Path, QName} | Rest], Acc) -> + %% Also see `delete_in_khepri/2'. + case khepri_tx_adv:delete(Path) of + {ok, Res} -> + Acc1 = case Res of + %% Khepri 0.16 and below returned `khepri:node_props()' + %% for adv queries and commands targeting one node: + #{data := _} -> + Deletions = rabbit_db_binding:delete_for_destination_in_khepri( + QName, false), + [{QName, Deletions} | Acc]; + %% Khepri 0.17+ return `khepri_adv:node_props_map()` + %% instead. + #{Path := #{data := _}} -> + Deletions = rabbit_db_binding:delete_for_destination_in_khepri( + QName, false), + [{QName, Deletions} | Acc]; + _ -> + Acc + end, + do_delete_transient_queues_in_khepri_tx(Rest, Acc1); + {error, _} = Error -> + Error + end. + %% ------------------------------------------------------------------- %% foreach_transient(). %% ------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_db_rtparams.erl b/deps/rabbit/src/rabbit_db_rtparams.erl index 74e9a6a1f3d..14566f70498 100644 --- a/deps/rabbit/src/rabbit_db_rtparams.erl +++ b/deps/rabbit/src/rabbit_db_rtparams.erl @@ -59,8 +59,13 @@ set_in_khepri(Key, Term) -> Record = #runtime_parameters{key = Key, value = Term}, case rabbit_khepri:adv_put(Path, Record) of + %% Khepri 0.16 and below returned `khepri:node_props()' for adv queries + %% and commands targeting one node: {ok, #{data := Params}} -> {old, Params#runtime_parameters.value}; + %% Khepri 0.17+ return `khepri_adv:node_props_map()` instead. + {ok, #{Path := #{data := Params}}} -> + {old, Params#runtime_parameters.value}; {ok, _} -> new end. @@ -114,8 +119,13 @@ set_in_khepri_tx(Key, Term) -> Record = #runtime_parameters{key = Key, value = Term}, case khepri_tx_adv:put(Path, Record) of + %% Khepri 0.16 and below returned `khepri:node_props()' for adv + %% queries and commands targeting one node: {ok, #{data := Params}} -> {old, Params#runtime_parameters.value}; + %% Khepri 0.17+ return `khepri_adv:node_props_map()` instead. + {ok, #{Path := #{data := Params}}} -> + {old, Params#runtime_parameters.value}; {ok, _} -> new end. @@ -347,11 +357,23 @@ delete_vhost_in_mnesia_tx(VHostName) -> <- mnesia:match_object(?MNESIA_TABLE, Match, read)]. delete_vhost_in_khepri(VHostName) -> - Path = khepri_vhost_rp_path( - VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR), - case rabbit_khepri:adv_delete_many(Path) of - {ok, Props} -> - {ok, rabbit_khepri:collect_payloads(Props)}; + Pattern = khepri_vhost_rp_path( + VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR), + case rabbit_khepri:adv_delete_many(Pattern) of + {ok, NodePropsMap} -> + RTParams = + maps:fold( + fun(Path, Props, Acc) -> + case {Path, Props} of + {?RABBITMQ_KHEPRI_VHOST_RUNTIME_PARAM_PATH( + VHostName, _, _), + #{data := RTParam}} -> + [RTParam | Acc]; + {_, _} -> + Acc + end + end, [], NodePropsMap), + {ok, RTParams}; {error, _} = Err -> Err end. diff --git a/deps/rabbit/src/rabbit_db_user.erl b/deps/rabbit/src/rabbit_db_user.erl index 3dfdbf8716b..a8420467256 100644 --- a/deps/rabbit/src/rabbit_db_user.erl +++ b/deps/rabbit/src/rabbit_db_user.erl @@ -628,20 +628,42 @@ clear_all_permissions_for_vhost_in_mnesia(VHostName) -> clear_all_permissions_for_vhost_in_khepri(VHostName) -> rabbit_khepri:transaction( fun() -> - UserPermissionsPath = khepri_user_permission_path( - ?KHEPRI_WILDCARD_STAR, VHostName), - TopicPermissionsPath = khepri_topic_permission_path( - ?KHEPRI_WILDCARD_STAR, VHostName, - ?KHEPRI_WILDCARD_STAR), - {ok, UserProps} = khepri_tx_adv:delete_many(UserPermissionsPath), - {ok, TopicProps} = khepri_tx_adv:delete_many( - TopicPermissionsPath), - Deletions = rabbit_khepri:collect_payloads( - TopicProps, - rabbit_khepri:collect_payloads(UserProps)), - {ok, Deletions} + clear_all_permissions_for_vhost_in_khepri_tx(VHostName) end, rw, #{timeout => infinity}). +clear_all_permissions_for_vhost_in_khepri_tx(VHostName) -> + UserPermissionsPattern = khepri_user_permission_path( + ?KHEPRI_WILDCARD_STAR, VHostName), + TopicPermissionsPattern = khepri_topic_permission_path( + ?KHEPRI_WILDCARD_STAR, VHostName, + ?KHEPRI_WILDCARD_STAR), + {ok, UserNodePropsMap} = khepri_tx_adv:delete_many(UserPermissionsPattern), + {ok, TopicNodePropsMap} = khepri_tx_adv:delete_many( + TopicPermissionsPattern), + Deletions0 = + maps:fold( + fun(Path, Props, Acc) -> + case {Path, Props} of + {?RABBITMQ_KHEPRI_USER_PERMISSION_PATH(VHostName, _), + #{data := Permission}} -> + [Permission | Acc]; + {_, _} -> + Acc + end + end, [], UserNodePropsMap), + Deletions1 = + maps:fold( + fun(Path, Props, Acc) -> + case {Path, Props} of + {?RABBITMQ_KHEPRI_TOPIC_PERMISSION_PATH(VHostName, _, _), + #{data := Permission}} -> + [Permission | Acc]; + {_, _} -> + Acc + end + end, Deletions0, TopicNodePropsMap), + {ok, Deletions1}. + %% ------------------------------------------------------------------- %% get_topic_permissions(). %% ------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_db_vhost.erl b/deps/rabbit/src/rabbit_db_vhost.erl index a4a0b32dc53..88f02d962f8 100644 --- a/deps/rabbit/src/rabbit_db_vhost.erl +++ b/deps/rabbit/src/rabbit_db_vhost.erl @@ -165,11 +165,23 @@ merge_metadata_in_khepri(VHostName, Metadata) -> Path = khepri_vhost_path(VHostName), Ret1 = rabbit_khepri:adv_get(Path), case Ret1 of - {ok, #{data := VHost0, payload_version := DVersion}} -> + {ok, QueryRet} -> + {VHost0, Vsn} = case QueryRet of + %% Khepri 0.16 and below returned + %% `khepri:node_props()' for adv queries and + %% commands targeting one node: + #{data := Data, payload_version := V} -> + {Data, V}; + %% Khepri 0.17+ return + %% `khepri_adv:node_props_map()` instead. + #{Path := #{data := Data, + payload_version := V}} -> + {Data, V} + end, VHost = vhost:merge_metadata(VHost0, Metadata), rabbit_log:debug("Updating a virtual host record ~p", [VHost]), Path1 = khepri_path:combine_with_conditions( - Path, [#if_payload_version{version = DVersion}]), + Path, [#if_payload_version{version = Vsn}]), Ret2 = rabbit_khepri:put(Path1, VHost), case Ret2 of ok -> @@ -411,13 +423,25 @@ update_in_mnesia_tx(VHostName, UpdateFun) update_in_khepri(VHostName, UpdateFun) -> Path = khepri_vhost_path(VHostName), case rabbit_khepri:adv_get(Path) of - {ok, #{data := V, payload_version := DVersion}} -> - V1 = UpdateFun(V), + {ok, QueryRet} -> + {VHost0, Vsn} = case QueryRet of + %% Khepri 0.16 and below returned + %% `khepri:node_props()' for adv queries and + %% commands targeting one node: + #{data := Data, payload_version := V} -> + {Data, V}; + %% Khepri 0.17+ return + %% `khepri_adv:node_props_map()` instead. + #{Path := #{data := Data, + payload_version := V}} -> + {Data, V} + end, + VHost1 = UpdateFun(VHost0), Path1 = khepri_path:combine_with_conditions( - Path, [#if_payload_version{version = DVersion}]), - case rabbit_khepri:put(Path1, V1) of + Path, [#if_payload_version{version = Vsn}]), + case rabbit_khepri:put(Path1, VHost1) of ok -> - V1; + VHost1; {error, {khepri, mismatching_node, _}} -> update_in_khepri(VHostName, UpdateFun); Error -> diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 847e682a88f..62da836d675 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -175,10 +175,6 @@ -export([force_shrink_member_to_current_member/0]). -%% Helpers for working with the Khepri API / types. --export([collect_payloads/1, - collect_payloads/2]). - -ifdef(TEST). -export([force_metadata_store/1, clear_forced_metadata_store/0]). @@ -1104,48 +1100,6 @@ handle_async_ret(RaEvent) -> fence(Timeout) -> khepri:fence(?STORE_ID, Timeout). -%% ------------------------------------------------------------------- -%% collect_payloads(). -%% ------------------------------------------------------------------- - --spec collect_payloads(Props) -> Ret when - Props :: khepri:node_props(), - Ret :: [Payload], - Payload :: term(). - -%% @doc Collects all payloads from a node props map. -%% -%% This is the same as calling `collect_payloads(Props, [])'. -%% -%% @private - -collect_payloads(Props) when is_map(Props) -> - collect_payloads(Props, []). - --spec collect_payloads(Props, Acc0) -> Ret when - Props :: khepri:node_props(), - Acc0 :: [Payload], - Ret :: [Payload], - Payload :: term(). - -%% @doc Collects all payloads from a node props map into the accumulator list. -%% -%% This is meant to be used with the `khepri_adv' API to easily collect the -%% payloads from the return value of `khepri_adv:delete_many/4' for example. -%% -%% @returns all payloads in the node props map collected into a list, with -%% `Acc0' as the tail. -%% -%% @private - -collect_payloads(Props, Acc0) when is_map(Props) andalso is_list(Acc0) -> - maps:fold( - fun (_Path, #{data := Payload}, Acc) -> - [Payload | Acc]; - (_Path, _NoPayload, Acc) -> - Acc - end, Acc0, Props). - -spec unregister_legacy_projections() -> Ret when Ret :: ok | timeout_error(). %% @doc Unregisters any projections which were registered in RabbitMQ 3.13.x diff --git a/deps/rabbitmq_consistent_hash_exchange/src/rabbit_db_ch_exchange.erl b/deps/rabbitmq_consistent_hash_exchange/src/rabbit_db_ch_exchange.erl index 83a3ac208e6..def09de1df0 100644 --- a/deps/rabbitmq_consistent_hash_exchange/src/rabbit_db_ch_exchange.erl +++ b/deps/rabbitmq_consistent_hash_exchange/src/rabbit_db_ch_exchange.erl @@ -104,13 +104,25 @@ create_binding_in_mnesia_tx(Src, Dst, Weight, UpdateFun) -> create_binding_in_khepri(Src, Dst, Weight, UpdateFun) -> Path = khepri_consistent_hash_path(Src), case rabbit_khepri:adv_get(Path) of - {ok, #{data := Chx0, payload_version := DVersion}} -> + {ok, QueryRet} -> + {Chx0, Vsn} = case QueryRet of + %% Khepri 0.16 and below returned + %% `khepri:node_props()' for adv queries and + %% commands targeting one node: + #{data := Data, payload_version := V} -> + {Data, V}; + %% Khepri 0.17+ return + %% `khepri_adv:node_props_map()` instead. + #{Path := #{data := Data, + payload_version := V}} -> + {Data, V} + end, case UpdateFun(Chx0, Dst, Weight) of already_exists -> already_exists; Chx -> Path1 = khepri_path:combine_with_conditions( - Path, [#if_payload_version{version = DVersion}]), + Path, [#if_payload_version{version = Vsn}]), Ret2 = rabbit_khepri:put(Path1, Chx), case Ret2 of ok -> diff --git a/deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl b/deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl index 36d922d7634..a9cbcd713b8 100644 --- a/deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl +++ b/deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl @@ -108,9 +108,21 @@ create_or_update_in_mnesia(XName, BindingKeyAndFun, ErrorFun) -> update_in_khepri(XName, BindingKeyAndFun, UpdateFun, ErrorFun) -> Path = khepri_jms_topic_exchange_path(XName), case rabbit_khepri:adv_get(Path) of - {ok, #{data := BindingFuns, payload_version := DVersion}} -> + {ok, QueryRet} -> + {BindingFuns, Vsn} = case QueryRet of + %% Khepri 0.16 and below returned + %% `khepri:node_props()' for adv queries + %% and commands targeting one node: + #{data := Data, payload_version := V} -> + {Data, V}; + %% Khepri 0.17+ return + %% `khepri_adv:node_props_map()` instead. + #{Path := #{data := Data, + payload_version := V}} -> + {Data, V} + end, Path1 = khepri_path:combine_with_conditions( - Path, [#if_payload_version{version = DVersion}]), + Path, [#if_payload_version{version = Vsn}]), Ret = rabbit_khepri:put(Path1, UpdateFun(BindingFuns, BindingKeyAndFun)), case Ret of ok -> ok; diff --git a/deps/rabbitmq_recent_history_exchange/src/rabbit_db_rh_exchange.erl b/deps/rabbitmq_recent_history_exchange/src/rabbit_db_rh_exchange.erl index a6eeef97a75..fb23531ca0f 100644 --- a/deps/rabbitmq_recent_history_exchange/src/rabbit_db_rh_exchange.erl +++ b/deps/rabbitmq_recent_history_exchange/src/rabbit_db_rh_exchange.erl @@ -106,10 +106,22 @@ insert0_in_mnesia(Key, Cached, Message, Length) -> insert_in_khepri(XName, Message, Length) -> Path = khepri_recent_history_path(XName), case rabbit_khepri:adv_get(Path) of - {ok, #{data := Cached0, payload_version := DVersion}} -> + {ok, QueryRet} -> + {Cached0, Vsn} = case QueryRet of + %% Khepri 0.16 and below returned + %% `khepri:node_props()' for adv queries and + %% commands targeting one node: + #{data := Data, payload_version := V} -> + {Data, V}; + %% Khepri 0.17+ return + %% `khepri_adv:node_props_map()` instead. + #{Path := #{data := Data, + payload_version := V}} -> + {Data, V} + end, Cached = add_to_cache(Cached0, Message, Length), Path1 = khepri_path:combine_with_conditions( - Path, [#if_payload_version{version = DVersion}]), + Path, [#if_payload_version{version = Vsn}]), Ret = rabbit_khepri:put(Path1, Cached), case Ret of ok ->