From 53476761c41692cc364afb8499a4669f90052e08 Mon Sep 17 00:00:00 2001 From: Rin Kuryloski Date: Tue, 22 Aug 2023 11:06:24 +0200 Subject: [PATCH 01/14] Use override_module instead of override_repository in oci workflow --- .github/workflows/rabbitmq-oci.yaml | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/.github/workflows/rabbitmq-oci.yaml b/.github/workflows/rabbitmq-oci.yaml index f9a66272..6291891d 100644 --- a/.github/workflows/rabbitmq-oci.yaml +++ b/.github/workflows/rabbitmq-oci.yaml @@ -86,18 +86,15 @@ jobs: - name: Configure the ra override for this ra working-directory: rabbitmq-server run: | - sudo npm install --global --silent @bazel/buildozer - - rules_erlang_version="$(cat MODULE.bazel | buildozer 'print version' -:rules_erlang)" - ra_repo="rules_erlang~$rules_erlang_version~erlang_package~ra" - cat << EOF >> user.bazelrc - build --override_repository $ra_repo=${{ github.workspace }}/ra + build --override_module rabbitmq_ra=${{ github.workspace }}/ra EOF - name: Configure otp for the OCI image working-directory: rabbitmq-server run: | + sudo npm install --global --silent @bazel/buildozer + buildozer 'set tars ["@otp_src_${{ matrix.otp_version_id }}//file"]' \ //packaging/docker-image:otp_source From 7c53d8e73d1194ffdd286db467f4756a1b3a9b52 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 1 Sep 2023 10:40:29 +0100 Subject: [PATCH 02/14] fix ra_log_segment:dump/1 --- src/ra_log_segment.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ra_log_segment.erl b/src/ra_log_segment.erl index 83452ae1..bb43277c 100644 --- a/src/ra_log_segment.erl +++ b/src/ra_log_segment.erl @@ -452,7 +452,7 @@ dump_index(File) -> dump(File) -> {ok, S0} = open(File, #{mode => read}), {Idx, Last} = range(S0), - L = fold(S0, Idx, Last - Idx + 1, fun erlang:binary_to_term/1, + L = fold(S0, Idx, Last, fun erlang:binary_to_term/1, fun (E, A) -> [E | A] end, []), close(S0), lists:reverse(L). From 498831980eaba76b8b1a1b752458b4973f7fc6a1 Mon Sep 17 00:00:00 2001 From: Rin Kuryloski Date: Tue, 5 Sep 2023 10:33:20 +0200 Subject: [PATCH 03/14] Add rabbitmq/bazel-central-registry@erlang-packages bzlmod registry Allows bazel to find our internal packages that may or may not be on BCR --- .bazelrc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.bazelrc b/.bazelrc index 975fb524..1bf9ff18 100644 --- a/.bazelrc +++ b/.bazelrc @@ -1,5 +1,8 @@ build --enable_bzlmod +build --registry=https://bcr.bazel.build/ +build --registry=https://raw.githubusercontent.com/rabbitmq/bazel-central-registry/erlang-packages/ + build --incompatible_strict_action_env build --local_test_jobs=1 From 5ce1e82037feb4d63118f9cd8e61125be8880bbf Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 27 Jun 2023 13:10:16 +0100 Subject: [PATCH 04/14] Seshat 0.6.0 and ra:key_metrics/1 ra:key_metrics/1 will return key metrics, previously inserted into the ra_metrics table (which remains for now). The aim is to allow this function to always return and use counters instead to be able to see progress during recovery. ensure term is updated for snapshot installations --- MODULE.bazel | 2 +- Makefile | 2 +- src/ra.erl | 40 +++++++++++++++++++- src/ra.hrl | 36 +++++++++++++++++- src/ra_counters.erl | 19 ++++++---- src/ra_log.erl | 32 +++++++++++++--- src/ra_server.erl | 50 ++++++++++++++++--------- src/ra_server_proc.erl | 5 ++- test/coordination_SUITE.erl | 75 ++++++++++++++++++++++++++++++++++++- 9 files changed, 224 insertions(+), 37 deletions(-) diff --git a/MODULE.bazel b/MODULE.bazel index dc7e2f2e..dabf401e 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -72,7 +72,7 @@ erlang_package.git_package( erlang_package.hex_package( name = "seshat", - version = "0.4.0", + version = "0.6.0", ) use_repo( diff --git a/Makefile b/Makefile index 1170f63f..444281e6 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ ESCRIPT_EMU_ARGS = -noinput -setcookie ra_fifo_cli dep_gen_batch_server = hex 0.8.8 dep_aten = hex 0.5.8 -dep_seshat = hex 0.4.0 +dep_seshat = hex 0.6.0 DEPS = aten gen_batch_server seshat TEST_DEPS = proper meck eunit_formatters inet_tcp_proxy diff --git a/src/ra.erl b/src/ra.erl index 14c9a84c..286bb46b 100644 --- a/src/ra.erl +++ b/src/ra.erl @@ -78,7 +78,8 @@ cast_aux_command/2, register_external_log_reader/1, member_overview/1, - member_overview/2 + member_overview/2, + key_metrics/1 ]). %% xref should pick these up @@ -1107,6 +1108,43 @@ member_overview(ServerId) -> member_overview(ServerId, Timeout) -> ra_server_proc:local_state_query(ServerId, overview, Timeout). +%% @doc Returns a map of key metrics about a Ra member +%% +%% The keys and values may vary depending on what state +%% the member is in. This function will never call into the +%% Ra process itself so is likely to return swiftly even +%% when the Ra process is busy (such as when it is recovering) +%% +%% @param ServerId the Ra server to obtain key metrics for +%% @end +key_metrics({Name, N} = ServerId) when N == node() -> + Fields = [last_applied, + commit_index, + snapshot_index, + last_written_index, + last_index, + commit_latency, + term], + Counters = case ra_counters:counters(ServerId, Fields) of + undefined -> + #{}; + C -> C + end, + case whereis(Name) of + undefined -> + Counters#{state => noproc}; + _ -> + case ets:lookup(ra_state, Name) of + [] -> + Counters#{state => unknown}; + [{_, State}] -> + Counters#{state => State} + end + end; +key_metrics({_, N} = ServerId) -> + erpc:call(N, ?MODULE, ?FUNCTION_NAME, [ServerId]). + + %% internal -spec usr(UserCommand, ReplyMode) -> Command when diff --git a/src/ra.hrl b/src/ra.hrl index e706fc5a..08d5c5b3 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -269,6 +269,7 @@ -define(C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, ?C_RA_LOG_RESERVED + 18). -define(C_RA_SRV_LOCAL_QUERIES, ?C_RA_LOG_RESERVED + 19). -define(C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, ?C_RA_LOG_RESERVED + 20). +-define(C_RA_SRV_RESERVED, ?C_RA_LOG_RESERVED + 21). -define(RA_SRV_COUNTER_FIELDS, @@ -312,7 +313,38 @@ {local_queries, ?C_RA_SRV_LOCAL_QUERIES, counter, "Total number of local queries"}, {invalid_reply_mode_commands, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, counter, - "Total number of commands received with an invalid reply-mode"} + "Total number of commands received with an invalid reply-mode"}, + {reserved_2, ?C_RA_SRV_RESERVED, counter, "Reserved counter"} ]). --define(RA_COUNTER_FIELDS, ?RA_LOG_COUNTER_FIELDS ++ ?RA_SRV_COUNTER_FIELDS). +-define(C_RA_SVR_METRIC_LAST_APPLIED, ?C_RA_SRV_RESERVED + 1). +-define(C_RA_SVR_METRIC_COMMIT_INDEX, ?C_RA_SRV_RESERVED + 2). +-define(C_RA_SVR_METRIC_SNAPSHOT_INDEX, ?C_RA_SRV_RESERVED + 3). +-define(C_RA_SVR_METRIC_LAST_INDEX, ?C_RA_SRV_RESERVED + 4). +-define(C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, ?C_RA_SRV_RESERVED + 5). +-define(C_RA_SVR_METRIC_COMMIT_LATENCY, ?C_RA_SRV_RESERVED + 6). +-define(C_RA_SVR_METRIC_TERM, ?C_RA_SRV_RESERVED + 7). + +-define(RA_SRV_METRICS_COUNTER_FIELDS, + [ + {last_applied, ?C_RA_SVR_METRIC_LAST_APPLIED, gauge, + "The last applied index. Can go backwards if a ra server is restarted."}, + {commit_index, ?C_RA_SVR_METRIC_COMMIT_INDEX, counter, + "The current commit index."}, + {snapshot_index, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, counter, + "The current snapshot index."}, + {last_index, ?C_RA_SVR_METRIC_LAST_INDEX, counter, + "The last index of the log."}, + {last_written_index, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, counter, + "The last fully written and fsynced index of the log."}, + {commit_latency, ?C_RA_SVR_METRIC_COMMIT_LATENCY, gauge, + "Approximate time taken from an entry being written to the log until it is committed."}, + {term, ?C_RA_SVR_METRIC_TERM, counter, "The current term."} + ]). + +-define(RA_COUNTER_FIELDS, + ?RA_LOG_COUNTER_FIELDS ++ + ?RA_SRV_COUNTER_FIELDS ++ + ?RA_SRV_METRICS_COUNTER_FIELDS). + +-define(FIELDSPEC_KEY, ra_seshat_fields_spec). diff --git a/src/ra_counters.erl b/src/ra_counters.erl index 42252b9a..ea73c7d2 100644 --- a/src/ra_counters.erl +++ b/src/ra_counters.erl @@ -5,6 +5,7 @@ %% Copyright (c) 2017-2022 VMware, Inc. or its affiliates. All rights reserved. %% -module(ra_counters). +-include("ra.hrl"). -export([ init/0, @@ -12,25 +13,24 @@ fetch/1, overview/0, overview/1, + counters/2, delete/1 ]). -type name() :: term(). --type seshat_field_spec() :: - {Name :: atom(), Position :: pos_integer(), - Type :: counter | gauge, Description :: string()}. + -spec init() -> ok. init() -> _ = application:ensure_all_started(seshat), _ = seshat:new_group(ra), + persistent_term:put(?FIELDSPEC_KEY, ?RA_COUNTER_FIELDS), ok. --spec new(name(), [seshat_field_spec()]) -> +-spec new(name(), seshat:fields_spec()) -> counters:counters_ref(). -new(Name, Fields) - when is_list(Fields) -> - seshat:new(ra, Name, Fields). +new(Name, FieldsSpec) -> + seshat:new(ra, Name, FieldsSpec). -spec fetch(name()) -> undefined | counters:counters_ref(). fetch(Name) -> @@ -47,3 +47,8 @@ overview() -> -spec overview(name()) -> #{atom() => non_neg_integer()}. overview(Name) -> seshat:overview(ra, Name). + +-spec counters(name(), [atom()]) -> + #{atom() => non_neg_integer()} | undefined. +counters(Name, Fields) -> + seshat:counters(ra, Name, Fields). diff --git a/src/ra_log.erl b/src/ra_log.erl index f7df25f8..d8371396 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -200,8 +200,10 @@ init(#{uid := UId, reader = Reader, snapshot_state = SnapshotState }, - + put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx), LastIdx = State000#?MODULE.last_index, + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LastIdx), % recover the last term {LastTerm0, State00} = case LastIdx of SnapIdx -> @@ -374,7 +376,8 @@ last_written(#?MODULE{last_written_index_term = LWTI}) -> %% forces the last index and last written index back to a prior index -spec set_last_index(ra_index(), state()) -> {ok, state()} | {not_found, state()}. -set_last_index(Idx, #?MODULE{cache = Cache0, +set_last_index(Idx, #?MODULE{cfg = Cfg, + cache = Cache0, last_written_index_term = {LWIdx0, _}} = State0) -> case fetch_term(Idx, State0) of {undefined, State} -> @@ -385,6 +388,8 @@ set_last_index(Idx, #?MODULE{cache = Cache0, %% this should always be found but still assert just in case true = LWTerm =/= undefined, Cache = ra_log_cache:set_last(Idx, Cache0), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LWIdx), {ok, State2#?MODULE{last_index = Idx, last_term = Term, cache = Cache, @@ -401,7 +406,8 @@ handle_event({written, {FromIdx, _ToIdx, _Term}}, %% Just drop the event in this case as it is stale {State, []}; handle_event({written, {FromIdx, ToIdx0, Term}}, - #?MODULE{last_written_index_term = {LastWrittenIdx0, + #?MODULE{cfg = Cfg, + last_written_index_term = {LastWrittenIdx0, LastWrittenTerm0}, last_index = LastIdx, snapshot_state = SnapState} = State0) @@ -416,6 +422,7 @@ handle_event({written, {FromIdx, ToIdx0, Term}}, ToIdx = min(ToIdx0, LastIdx), case fetch_term(ToIdx, State0) of {Term, State} when is_integer(Term) -> + ok = put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, ToIdx), {State#?MODULE{last_written_index_term = {ToIdx, Term}}, %% delaying truncate_cache until the next event allows any entries %% that became committed to be read from cache rather than ETS @@ -426,8 +433,10 @@ handle_event({written, {FromIdx, ToIdx0, Term}}, % followers returning appending the entry and the leader committing % and processing a snapshot before the written event comes in. % ensure last_written_index_term does not go backwards - LastWrittenIdxTerm = {max(LastWrittenIdx0, ToIdx), + LastWrittenIdx = max(LastWrittenIdx0, ToIdx), + LastWrittenIdxTerm = {LastWrittenIdx, max(LastWrittenTerm0, Term)}, + ok = put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LastWrittenIdx), {State#?MODULE{last_written_index_term = LastWrittenIdxTerm}, [{next_event, {ra_log_event, {truncate_cache, FromIdx, ToIdx}}}]}; {OtherTerm, State} -> @@ -491,13 +500,15 @@ handle_event({segments, Tid, NewSegs}, {State, log_update_effects(Readers, Pid, State)} end; handle_event({snapshot_written, {SnapIdx, _} = Snap}, - #?MODULE{first_index = FstIdx, + #?MODULE{cfg = Cfg, + first_index = FstIdx, snapshot_state = SnapState0} = State0) %% only update snapshot if it is newer than the last snapshot when SnapIdx >= FstIdx -> % delete any segments outside of first_index {State, Effects0} = delete_segments(SnapIdx, State0), SnapState = ra_snapshot:complete_snapshot(Snap, SnapState0), + put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx), %% delete old snapshot files %% This is done as an effect %% so that if an old snapshot is still being replicated @@ -573,6 +584,9 @@ install_snapshot({SnapIdx, _} = IdxTerm, SnapState, #?MODULE{cfg = Cfg, cache = Cache} = State0) -> ok = incr_counter(Cfg, ?C_RA_LOG_SNAPSHOTS_INSTALLED, 1), + ok = put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, SnapIdx), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, SnapIdx), {State, Effs} = delete_segments(SnapIdx, State0), {State#?MODULE{snapshot_state = SnapState, first_index = SnapIdx + 1, @@ -837,6 +851,7 @@ wal_truncate_write(#?MODULE{cfg = #cfg{uid = UId, % and that prior entries should be considered stale ok = ra_log_wal:truncate_write({UId, self()}, Wal, Idx, Term, Cmd), ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1), + ok = put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), State#?MODULE{last_index = Idx, last_term = Term, cache = ra_log_cache:add(Entry, Cache)}. @@ -847,6 +862,7 @@ wal_write(#?MODULE{cfg = #cfg{uid = UId, case ra_log_wal:write({UId, self()}, Wal, Idx, Term, Cmd) of ok -> ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), State#?MODULE{last_index = Idx, last_term = Term, cache = ra_log_cache:add(Entry, Cache)}; {error, wal_down} -> @@ -869,6 +885,7 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId, case ra_log_wal:write_batch(Wal, lists:reverse(WalCommands)) of ok -> ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, Num), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx), State#?MODULE{last_index = LastIdx, last_term = LastTerm, cache = Cache}; @@ -1030,6 +1047,11 @@ incr_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined -> incr_counter(#cfg{counter = undefined}, _Ix, _N) -> ok. +put_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined -> + counters:put(Cnt, Ix, N); +put_counter(#cfg{counter = undefined}, _Ix, _N) -> + ok. + server_data_dir(Dir, UId) -> Me = ra_lib:to_list(UId), filename:join(Dir, Me). diff --git a/src/ra_server.erl b/src/ra_server.erl index 1ac231d3..e69bb923 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -324,6 +324,9 @@ init(#{id := Id, max_append_entries_rpc_batch_size = MaxAERBatchSize, counter = maps:get(counter, Config, undefined), system_config = SystemConfig}, + put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_INDEX, CommitIndex), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, SnapshotIdx), + put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, CurrentTerm), #{cfg => Cfg, current_term => CurrentTerm, @@ -350,19 +353,21 @@ init(#{id := Id, recover(#{cfg := #cfg{log_id = LogId, machine_version = MacVer, - effective_machine_version = EffMacVer}, + effective_machine_version = EffMacVer} = Cfg, commit_index := CommitIndex, last_applied := LastApplied} = State0) -> + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, LastApplied), ?DEBUG("~ts: recovering state machine version ~b:~b from index ~b to ~b", [LogId, EffMacVer, MacVer, LastApplied, CommitIndex]), Before = erlang:system_time(millisecond), {#{log := Log0, cfg := #cfg{effective_machine_version = EffMacVerAfter}} = State, _} = apply_to(CommitIndex, - fun(E, S0) -> + fun({Idx, _, _} = E, S0) -> %% Clear out the effects and notifies map %% to avoid memory explosion {Mod, LastAppl, S, MacSt, _E, _N, LastTs} = apply_with(E, S0), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, Idx), {Mod, LastAppl, S, MacSt, [], #{}, LastTs} end, State0, []), @@ -372,6 +377,7 @@ recover(#{cfg := #cfg{log_id = LogId, [LogId, EffMacVerAfter, MacVer, LastApplied, CommitIndex, After - Before]), %% disable segment read cache by setting random access pattern Log = ra_log:release_resources(1, random, Log0), + put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_LATENCY, 0), State#{log => Log, %% reset commit latency as recovery may calculate a very old value commit_latency => 0}. @@ -969,6 +975,7 @@ handle_follower(#append_entries_rpc{term = Term, current_term := CurTerm}) when Term >= CurTerm -> ok = incr_counter(Cfg, ?C_RA_SRV_AER_RECEIVED_FOLLOWER, 1), + ok = put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_INDEX, LeaderCommit), %% this is a valid leader, append entries message Effects0 = [{record_leader_msg, LeaderId}], State0 = update_term(Term, State00#{leader_id => LeaderId, @@ -1129,15 +1136,14 @@ handle_follower(#request_vote_rpc{term = Term, candidate_id = Cand, [LogId, Cand, {LLIdx, LLTerm}, Term, CurTerm]), Reply = #request_vote_result{term = Term, vote_granted = true}, State = update_term_and_voted_for(Term, Cand, State1), - {follower, State#{voted_for => Cand, current_term => Term}, - [{reply, Reply}]}; + {follower, State, [{reply, Reply}]}; false -> ?INFO("~ts: declining vote for ~w for term ~b," " candidate last log index term was: ~w~n" " last log entry idxterm seen was: ~w", [LogId, Cand, Term, {LLIdx, LLTerm}, {LastIdxTerm}]), Reply = #request_vote_result{term = Term, vote_granted = false}, - {follower, State1#{current_term => Term}, [{reply, Reply}]} + {follower, update_term(Term, State1), [{reply, Reply}]} end; handle_follower(#request_vote_rpc{term = Term, candidate_id = Candidate}, State = #{current_term := CurTerm, @@ -1189,8 +1195,8 @@ handle_follower(#install_snapshot_rpc{term = Term, SnapState0 = ra_log:snapshot_state(Log0), {ok, SS} = ra_snapshot:begin_accept(Meta, SnapState0), Log = ra_log:set_snapshot_state(SS, Log0), - {receive_snapshot, State0#{log => Log, - leader_id => LeaderId}, + {receive_snapshot, update_term(Term, State0#{log => Log, + leader_id => LeaderId}), [{next_event, Rpc}, {record_leader_msg, LeaderId}]}; handle_follower(#request_vote_result{}, State) -> %% handle to avoid logging as unhandled @@ -1262,19 +1268,19 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term, end, {#{cluster := ClusterIds}, MacState} = ra_log:recover_snapshot(Log), - State = State0#{cfg => Cfg, - log => Log, - current_term => Term, - commit_index => SnapIndex, - last_applied => SnapIndex, - cluster => make_cluster(Id, ClusterIds), - machine_state => MacState}, + State = update_term(Term, + State0#{cfg => Cfg, + log => Log, + commit_index => SnapIndex, + last_applied => SnapIndex, + cluster => make_cluster(Id, ClusterIds), + machine_state => MacState}), %% it was the last snapshot chunk so we can revert back to %% follower status {follower, persist_last_applied(State), [{reply, Reply} | Effs]}; next -> Log = ra_log:set_snapshot_state(SnapState, Log0), - State = State0#{log => Log}, + State = update_term(Term, State0#{log => Log}), {receive_snapshot, State, [{reply, Reply}]} end; handle_receive_snapshot({ra_log_event, Evt}, @@ -2144,6 +2150,7 @@ update_term_and_voted_for(Term, VotedFor, #{cfg := #cfg{uid = UId} = Cfg, ok = ra_log_meta:store(MetaName, UId, current_term, Term), ok = ra_log_meta:store_sync(MetaName, UId, voted_for, VotedFor), incr_counter(Cfg, ?C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, 1), + put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, Term), reset_query_index(State#{current_term => Term, voted_for => VotedFor}) end. @@ -2237,7 +2244,7 @@ apply_to(ApplyTo, ApplyFun, Notifys0, Effects0, #{last_applied := LastApplied, cfg := #cfg{machine_version = MacVer, effective_machine_module = MacMod, - effective_machine_version = EffMacVer}, + effective_machine_version = EffMacVer} = Cfg, machine_state := MacState0, log := Log0} = State0) when ApplyTo > LastApplied andalso MacVer >= EffMacVer -> @@ -2257,6 +2264,8 @@ apply_to(ApplyTo, ApplyFun, Notifys0, Effects0, %% due to machine versioning all entries may not have been applied %% FinalEffs = make_notify_effects(Notifys, lists:reverse(Effects)), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, AppliedTo), + put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_LATENCY, CommitLatency), {State#{last_applied => AppliedTo, log => Log, commit_latency => CommitLatency, @@ -2549,12 +2558,14 @@ append_entries_reply(Term, Success, State = #{log := Log}) -> last_index = LWIdx, last_term = LWTerm}. -evaluate_quorum(#{commit_index := CI0} = State0, Effects0) -> +evaluate_quorum(#{cfg := Cfg, + commit_index := CI0} = State0, Effects0) -> % TODO: shortcut function if commit index was not incremented State = #{commit_index := CI} = increment_commit_index(State0), Effects = case CI > CI0 of true -> + put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_INDEX, CI), [{aux, eval} | Effects0]; false -> Effects0 @@ -2799,6 +2810,11 @@ incr_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined -> incr_counter(#cfg{counter = undefined}, _Ix, _N) -> ok. +put_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined -> + counters:put(Cnt, Ix, N); +put_counter(#cfg{counter = undefined}, _Ix, _N) -> + ok. + meta_name(#cfg{system_config = #{names := #{log_meta := Name}}}) -> Name; meta_name(#{names := #{log_meta := Name}}) -> diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index e13ec1b8..49e8e4d9 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -270,6 +270,8 @@ init(Config) -> do_init(#{id := Id, cluster_name := ClusterName} = Config0) -> + Key = ra_lib:ra_server_id_to_local_name(Id), + true = ets:insert(ra_state, {Key, init}), process_flag(trap_exit, true), Config = #{counter := Counter, system_config := SysConf} = maps:merge(config_defaults(Id), @@ -283,7 +285,6 @@ do_init(#{id := Id, UId = ra_server:uid(ServerState), % ensure ra_directory has the new pid #{names := Names} = SysConf, - Key = ra_lib:ra_server_id_to_local_name(Id), ok = ra_directory:register_name(Names, UId, self(), maps:get(parent, Config, undefined), Key, ClusterName), @@ -1527,7 +1528,7 @@ config_defaults(ServerId) -> install_snap_rpc_timeout => ?INSTALL_SNAP_RPC_TIMEOUT, await_condition_timeout => ?DEFAULT_AWAIT_CONDITION_TIMEOUT, initial_members => [], - counter => ra_counters:new(ServerId, ?RA_COUNTER_FIELDS), + counter => ra_counters:new(ServerId, {persistent_term, ?FIELDSPEC_KEY}), system_config => ra_system:default_config() }. diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index 8a96c012..0ecf9e42 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -42,7 +42,8 @@ all_tests() -> local_log_effect, leaderboard, bench, - disconnected_node_catches_up + disconnected_node_catches_up, + key_metrics ]. groups() -> @@ -381,6 +382,76 @@ disconnected_node_catches_up(Config) -> [ok = slave:stop(S) || {_, S} <- ServerIds], ok. +key_metrics(Config) -> + PrivDir = ?config(data_dir, Config), + ClusterName = ?config(cluster_name, Config), + ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], + Machine = {module, ?MODULE, #{}}, + {ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, ServerIds), + {ok, _, Leader} = ra:members(hd(Started)), + + Data = crypto:strong_rand_bytes(1024), + [begin + ok = ra:pipeline_command(Leader, {data, Data}) + end || _ <- lists:seq(1, 10000)], + {ok, _, _} = ra:process_command(Leader, {data, Data}), + + timer:sleep(100), + TestId = lists:last(Started), + ok = ra:stop_server(?SYS, TestId), + StoppedMetrics = ra:key_metrics(TestId), + ct:pal("StoppedMetrics ~p", [StoppedMetrics]), + ?assertMatch(#{state := noproc, + last_applied := LA, + last_written_index := LW, + commit_index := CI} + when LA > 0 andalso + LW > 0 andalso + CI > 0, + StoppedMetrics), + ok = ra:restart_server(?SYS, TestId), + await_condition( + fun () -> + Metrics = ra:key_metrics(TestId), + ct:pal("RecoverMetrics ~p", [Metrics]), + recover == maps:get(state, Metrics) + end, 200), + {ok, _, _} = ra:process_command(Leader, {data, Data}), + await_condition( + fun () -> + Metrics = ra:key_metrics(TestId), + ct:pal("FollowerMetrics ~p", [Metrics]), + follower == maps:get(state, Metrics) + end, 200), + [begin + M = ra:key_metrics(S), + ct:pal("Metrics ~p", [M]), + ?assertMatch(#{state := _, + last_applied := LA, + last_written_index := LW, + commit_index := CI} + when LA > 0 andalso + LW > 0 andalso + CI > 0, M) + end + || S <- Started], + ok = ra:transfer_leadership(Leader, TestId), + timer:sleep(1000), + [begin + M = ra:key_metrics(S), + ct:pal("Metrics ~p", [M]), + ?assertMatch(#{state := _, + last_applied := LA, + last_written_index := LW, + commit_index := CI} + when LA > 0 andalso + LW > 0 andalso + CI > 0, M) + end || S <- Started], + + [ok = slave:stop(S) || {_, S} <- ServerIds], + ok. + leaderboard(Config) -> PrivDir = ?config(data_dir, Config), @@ -537,6 +608,8 @@ apply(#{index := Idx}, {do_local_log, SenderPid, Opts}, State) -> end, {local, node(SenderPid)}}, {State, ok, [Eff]}; +apply(#{index := _Idx}, {data, _}, State) -> + {State, ok, []}; apply(#{index := Idx}, _Cmd, State) -> {State, ok, [{release_cursor, Idx, State}]}. From 8d426341d784fa288f0b98ec406b99d082e773de Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 1 Sep 2023 13:10:58 +0100 Subject: [PATCH 05/14] speed up ra_log_2_SUITE --- test/ra_log_2_SUITE.erl | 125 +++++++++++++++++++++++++++++++--------- 1 file changed, 99 insertions(+), 26 deletions(-) diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index 977fefd7..ff81a67d 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -116,8 +116,9 @@ handle_overwrite(Config) -> {2, 2} = ra_log:last_written( element(1, ra_log:handle_event({written, {1, 2, 2}}, Log))), ok = ra_log_wal:force_roll_over(ra_log_wal), - _ = deliver_all_log_events(Log, 1000), + _ = deliver_all_log_events(Log, 100), ra_log:close(Log), + flush(), ok. receive_segment(Config) -> @@ -125,21 +126,30 @@ receive_segment(Config) -> % write a few entries Entries = [{I, 1, <<"value_", I:32/integer>>} || I <- lists:seq(1, 3)], + {PreWritten, _} = ra_log:last_written(Log0), Log1 = lists:foldl(fun(E, Acc0) -> ra_log:append(E, Acc0) end, Log0, Entries), - Log2 = deliver_all_log_events(Log1, 500), + % Log2 = deliver_all_log_events(Log1, 500), + Log2 = deliver_log_events_cond( + Log1, fun (L) -> + {PostWritten, _} = ra_log:last_written(L), + PostWritten >= (PreWritten + 3) + end, 100), {3, 1} = ra_log:last_written(Log2), UId = ?config(uid, Config), [MemTblTid] = [Tid || {Key, _, _, Tid} <- ets:tab2list(ra_log_open_mem_tables), Key == UId], + ?assert(ets:info(MemTblTid) =/= undefined), % force wal roll over ok = ra_log_wal:force_roll_over(ra_log_wal), - Log3 = deliver_all_log_events(Log2, 1500), - % validate ets table has been recovered - ?assert(lists:member(MemTblTid, ets:all()) =:= false), - [] = ets:tab2list(ra_log_open_mem_tables), - [] = ets:tab2list(ra_log_closed_mem_tables), + % Log3 = deliver_all_log_events(Log2, 1500), + Log3 = deliver_log_events_cond( + Log2, fun (_L) -> + ets:info(MemTblTid) == undefined andalso + [] =:= ets:tab2list(ra_log_open_mem_tables) andalso + [] =:= ets:tab2list(ra_log_closed_mem_tables) + end, 100), % validate reads {Entries, FinalLog} = ra_log_take(1, 3, Log3), ?assertEqual(length(Entries), 3), @@ -226,7 +236,13 @@ validate_reads_for_overlapped_writes(Config) -> % write 350 - 500 in term 2 Log4 = write_and_roll(350, 500, 2, Log3), Log5 = write_n(500, 551, 2, Log4), - Log6 = deliver_all_log_events(Log5, 200), + % Log6 = deliver_all_log_events(Log5, 200), + % ct:pal("LAST ~p", [ra_log:last_written(Log6)]), + Log6 = deliver_log_events_cond( + Log5, fun (L) -> + {W, _} = ra_log:last_written(L), + W >= 550 + end, 100), Log7 = validate_fold(1, 199, 1, Log6), Log8 = validate_fold(200, 550, 2, Log7), @@ -578,7 +594,7 @@ wal_crash_recover(Config) -> spawn(fun () -> proc_lib:stop(ra_log_segment_writer) end), Log3 = write_n(75, 100, 2, Log2), % wait long enough for the resend window to pass - timer:sleep(2000), + timer:sleep(1000), Log = assert_log_events(write_n(100, 101, 2, Log3), fun (L) -> {100, 2} == ra_log:last_written(L) @@ -961,11 +977,25 @@ missed_closed_tables_are_deleted_at_next_opportunity(Config) -> {Log6, _} = ra_log:update_release_cursor(154, #{?N1 => new_peer(), ?N2 => new_peer()}, 1, initial_state, Log5), - _Log = deliver_all_log_events(Log6, 500), - - [_] = find_segments(Config), + deliver_log_events_cond(Log6, + fun (_) -> + case find_segments(Config) of + [_] -> true; + _ -> false + end + end, 100), ok. +await_cond(_Fun, 0) -> + false; +await_cond(Fun, N) -> + case Fun() of + true -> true; + false -> + timer:sleep(250), + await_cond(Fun, N -1) + end. + transient_writer_is_handled(Config) -> Self = self(), UId2 = <<(?config(uid, Config))/binary, "sub_proc">>, @@ -1013,8 +1043,6 @@ external_reader(Config) -> Log1 = write_n(200, 220, 2, write_and_roll(1, 200, 2, Log0)), - timer:sleep(1000), - Self = self(), Pid = spawn( fun () -> @@ -1054,18 +1082,17 @@ external_reader(Config) -> end, ra_log_wal:force_roll_over(ra_log_wal), - _Log3 = deliver_all_log_events(Log2, 500), - - %% this should result in a segment update - receive - {got, Evt2, Entries1} -> - ct:pal("got segs: ~w ~w", [Evt2, length(Entries1)]), - ok - after 2000 -> - flush(), - exit(got_timeout_2) - end, - timer:sleep(2000), + deliver_log_events_cond( + Log2, fun (_L) -> + %% this should result in a segment update + receive + {got, Evt2, Entries1} -> + ct:pal("got segs: ~p ~p", [Evt2, length(Entries1)]), + true + after 10 -> + false + end + end, 100), flush(), ok. @@ -1139,6 +1166,52 @@ write_n(From, To, Term, Log0) -> %% Utility functions +deliver_log_events_cond(_Log0, _CondFun, 0) -> + flush(), + ct:fail("condition did not manifest"); +deliver_log_events_cond(Log0, CondFun, N) -> + receive + {ra_log_event, Evt} -> + ct:pal("log evt: ~p", [Evt]), + {Log1, Effs} = ra_log:handle_event(Evt, Log0), + Log2 = lists:foldl( + fun({send_msg, P, E}, Acc) -> + P ! E, + Acc; + ({next_event, {ra_log_event, E}}, Acc0) -> + {Acc, _} = ra_log:handle_event(E, Acc0), + Acc; + (_, Acc) -> + Acc + end, Log1, Effs), + [P ! E || {send_msg, P, E, _} <- Effs], + case CondFun(Log2) of + {false, Log} -> + deliver_log_events_cond(Log, CondFun, N-1); + false -> + deliver_log_events_cond(Log2, CondFun, N-1); + {true, Log} -> + ct:pal("condition was true!!"), + Log; + true -> + ct:pal("condition was true!"), + Log2 + end + after 100 -> + case CondFun(Log0) of + {false, Log} -> + deliver_log_events_cond(Log, CondFun, N-1); + false -> + deliver_log_events_cond(Log0, CondFun, N-1); + {true, Log} -> + ct:pal("condition was true!"), + Log; + true -> + ct:pal("condition was true!"), + Log0 + end + end. + deliver_all_log_events(Log0, Timeout) -> receive {ra_log_event, Evt} -> From 2a3d2c1ecd85d2800978d5cc4d303b577774ac90 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 5 Sep 2023 15:43:39 +0100 Subject: [PATCH 06/14] v2.7.0-pre.1 --- src/ra.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ra.app.src b/src/ra.app.src index e85bd56d..acf09593 100644 --- a/src/ra.app.src +++ b/src/ra.app.src @@ -1,6 +1,6 @@ {application,ra, [{description,"Raft library"}, - {vsn,"2.6.3"}, + {vsn,"2.7.0-pre.1"}, {licenses,["Apache-2.0","MPL-2.0"]}, {links,[{"github","https://github.com/rabbitmq/ra"}]}, {modules,[]}, From 5bf45899009defbfe52928a2baeb50460dda4bf7 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 5 Sep 2023 16:39:09 +0100 Subject: [PATCH 07/14] coordination_SUITE: reliability --- src/ra_directory.erl | 6 +- test/coordination_SUITE.erl | 122 +++++++++++++++++++----------------- 2 files changed, 66 insertions(+), 62 deletions(-) diff --git a/src/ra_directory.erl b/src/ra_directory.erl index d50d6182..0a393fea 100644 --- a/src/ra_directory.erl +++ b/src/ra_directory.erl @@ -43,9 +43,9 @@ init(System) when is_atom(System) -> init(Dir, #{directory := Name, directory_rev := NameRev}) -> _ = ets:new(Name, [named_table, - public, - {read_concurrency, true} - ]), + public, + {read_concurrency, true} + ]), ok = ra_lib:make_dir(Dir), Dets = filename:join(Dir, "names.dets"), {ok, NameRev} = dets:open_file(NameRev, diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index 0ecf9e42..a2c1c630 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -105,7 +105,6 @@ start_stop_restart_delete_on_remote(Config) -> ok = ra:force_delete_server(?SYS, NodeId), % idempotency ok = ra:force_delete_server(?SYS, NodeId), - timer:sleep(500), slave:stop(S1), ok. @@ -139,7 +138,6 @@ start_or_restart_cluster(Config) -> PingResults = [{pong, _} = ra_server_proc:ping(N, 500) || N <- NodeIds], % assert one node is leader ?assert(lists:any(fun ({pong, S}) -> S =:= leader end, PingResults)), - % timer:sleep(1000), [ok = slave:stop(S) || {_, S} <- NodeIds], NodeIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], %% this should restart @@ -191,15 +189,19 @@ delete_one_server_cluster(Config) -> delete_two_server_cluster(Config) -> PrivDir = ?config(data_dir, Config), ClusterName = ?config(cluster_name, Config), - NodeIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2]], + ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2]], Machine = {module, ?MODULE, #{}}, - {ok, _, []} = ra:start_cluster(?SYS, ClusterName, Machine, NodeIds), - {ok, _} = ra:delete_cluster(NodeIds), - timer:sleep(1000), - {error, _} = ra_server_proc:ping(hd(tl(NodeIds)), 50), - {error, _} = ra_server_proc:ping(hd(NodeIds), 50), - % assert all nodes are actually started - [ok = slave:stop(S) || {_, S} <- NodeIds], + {ok, _, []} = ra:start_cluster(?SYS, ClusterName, Machine, ServerIds), + {ok, _} = ra:delete_cluster(ServerIds), + % timer:sleep(1000), + await_condition( + fun () -> + lists:all( + fun ({Name, Node}) -> + undefined == erpc:call(Node, erlang, whereis, [Name]) + end, ServerIds) + end, 100), + [ok = slave:stop(S) || {_, S} <- ServerIds], receive Anything -> ct:pal("got weird message ~p", [Anything]), @@ -212,36 +214,43 @@ delete_two_server_cluster(Config) -> delete_three_server_cluster(Config) -> PrivDir = ?config(data_dir, Config), ClusterName = ?config(cluster_name, Config), - NodeIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], + ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], Machine = {module, ?MODULE, #{}}, - {ok, _, []} = ra:start_cluster(?SYS, ClusterName, Machine, NodeIds), - {ok, _} = ra:delete_cluster(NodeIds), - timer:sleep(250), - {error, _} = ra_server_proc:ping(hd(tl(NodeIds)), 50), - {error, _} = ra_server_proc:ping(hd(NodeIds), 50), - % assert all nodes are actually started - [ok = slave:stop(S) || {_, S} <- NodeIds], + {ok, _, []} = ra:start_cluster(?SYS, ClusterName, Machine, ServerIds), + {ok, _} = ra:delete_cluster(ServerIds), + await_condition( + fun () -> + lists:all( + fun ({Name, Node}) -> + undefined == erpc:call(Node, erlang, whereis, [Name]) + end, ServerIds) + end, 100), + [ok = slave:stop(S) || {_, S} <- ServerIds], ok. delete_three_server_cluster_parallel(Config) -> PrivDir = ?config(data_dir, Config), ClusterName = ?config(cluster_name, Config), - NodeIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], + ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], Machine = {module, ?MODULE, #{}}, - {ok, _, []} = ra:start_cluster(?SYS, ClusterName, Machine, NodeIds), + {ok, _, []} = ra:start_cluster(?SYS, ClusterName, Machine, ServerIds), %% spawn a delete command to try cause it to commit more than %% one delete command - spawn(fun () -> {ok, _} = ra:delete_cluster(NodeIds) end), - spawn(fun () -> {ok, _} = ra:delete_cluster(NodeIds) end), - {ok, _} = ra:delete_cluster(NodeIds), - timer:sleep(250), - {error, _} = ra_server_proc:ping(hd(tl(NodeIds)), 50), - {error, _} = ra_server_proc:ping(hd(NodeIds), 50), + spawn(fun () -> {ok, _} = ra:delete_cluster(ServerIds) end), + spawn(fun () -> {ok, _} = ra:delete_cluster(ServerIds) end), + {ok, _} = ra:delete_cluster(ServerIds), + await_condition( + fun () -> + lists:all( + fun ({Name, Node}) -> + undefined == erpc:call(Node, erlang, whereis, [Name]) + end, ServerIds) + end, 100), [begin true = rpc:call(S, ?MODULE, check_sup, []) - end || {_, S} <- NodeIds], + end || {_, S} <- ServerIds], % assert all nodes are actually started - [ok = slave:stop(S) || {_, S} <- NodeIds], + [ok = slave:stop(S) || {_, S} <- ServerIds], ok. check_sup() -> @@ -342,6 +351,7 @@ disconnected_node_catches_up(Config) -> {ok, _, Leader} = ra:members(hd(Started)), [{_, DownServerNode} = DownServerId, _] = Started -- [Leader], + %% the ra_directory DETS table has a 500ms autosave configuration timer:sleep(1000), ok = slave:stop(DownServerNode), @@ -365,11 +375,12 @@ disconnected_node_catches_up(Config) -> <> -> binary_to_atom(Tag, utf8) end, - timer:sleep(5000), - start_follower(DownServerNodeName, PrivDir), - ok = ra:restart_server(?SYS, DownServerId), + await_condition( + fun () -> + ok == ra:restart_server(?SYS, DownServerId) + end, 100), %% wait for snapshot on restarted server await_condition( @@ -435,19 +446,6 @@ key_metrics(Config) -> CI > 0, M) end || S <- Started], - ok = ra:transfer_leadership(Leader, TestId), - timer:sleep(1000), - [begin - M = ra:key_metrics(S), - ct:pal("Metrics ~p", [M]), - ?assertMatch(#{state := _, - last_applied := LA, - last_written_index := LW, - commit_index := CI} - when LA > 0 andalso - LW > 0 andalso - CI > 0, M) - end || S <- Started], [ok = slave:stop(S) || {_, S} <- ServerIds], ok. @@ -464,24 +462,30 @@ leaderboard(Config) -> %% synchronously get leader {ok, _, Leader} = ra:members(hd(Started)), - timer:sleep(500), %% assert leaderboard has correct leader on all nodes - [begin - L = rpc:call(N, ra_leaderboard, lookup_leader, [ClusterName]), - ct:pal("~w has ~w as leader expected ~w", [N, L, Leader]), - ?assertEqual(Leader, L) - end || {_, N} <- NodeIds], + await_condition( + fun () -> + lists:all(fun (B) -> B end, + [begin + L = rpc:call(N, ra_leaderboard, lookup_leader, [ClusterName]), + ct:pal("~w has ~w as leader expected ~w", [N, L, Leader]), + Leader == L + end || {_, N} <- NodeIds]) + end, 100), NextLeader = hd(lists:delete(Leader, Started)), ok = ra:transfer_leadership(Leader, NextLeader), {ok, _, NewLeader} = ra:members(hd(Started)), - timer:sleep(500), - [begin - L = rpc:call(N, ra_leaderboard, lookup_leader, [ClusterName]), - ct:pal("~w has ~w as leader expected ~w", [N, L, NewLeader]), - ?assertEqual(NewLeader, L) - end || {_, N} <- NodeIds], + await_condition( + fun () -> + lists:all(fun (B) -> B end, + [begin + L = rpc:call(N, ra_leaderboard, lookup_leader, [ClusterName]), + ct:pal("~w has ~w as leader expected ~w", [N, L, Leader]), + NewLeader == L + end || {_, N} <- NodeIds]) + end, 100), [ok = slave:stop(S) || {_, S} <- NodeIds], ok. @@ -580,7 +584,7 @@ start_follower(N, PrivDir) -> ct:pal("starting secondary node with ~ts on host ~ts for node ~ts", [Pa, Host, node()]), {ok, S} = slave:start_link(Host, N, Pa), ok = ct_rpc:call(S, ?MODULE, node_setup, [PrivDir]), - _ = erpc:call(S, ra, start, []), + ok = erpc:call(S, ra, start, []), ok = ct_rpc:call(S, logger, set_primary_config, [level, all]), S. @@ -632,9 +636,9 @@ node_setup(DataDir) -> await_condition(_Fun, 0) -> exit(condition_did_not_materialise); await_condition(Fun, Attempts) -> - case Fun() of + case catch Fun() of true -> ok; - false -> + _ -> timer:sleep(100), await_condition(Fun, Attempts - 1) end. From 85b2d942953bf6907028680d15dff4426821a49a Mon Sep 17 00:00:00 2001 From: Rin Kuryloski Date: Thu, 7 Sep 2023 10:50:31 +0200 Subject: [PATCH 08/14] Add automation to publish to our bzlmod registry on release --- .github/workflows/publish-rabbitmq-bcr.yml | 76 ++++++++++++++++++++++ .github/workflows/release.yml | 1 - 2 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/publish-rabbitmq-bcr.yml diff --git a/.github/workflows/publish-rabbitmq-bcr.yml b/.github/workflows/publish-rabbitmq-bcr.yml new file mode 100644 index 00000000..373b0f55 --- /dev/null +++ b/.github/workflows/publish-rabbitmq-bcr.yml @@ -0,0 +1,76 @@ +name: Add to rabbitmq/bazel-central-registry@erlang-packages +on: + release: + types: [published] + workflow_dispatch: +jobs: + add-module: + runs-on: ubuntu-latest + steps: + - name: CHECKOUT + uses: actions/checkout@v3 + with: + path: ra + - name: CHECKOUT rabbitmq/bazel-central-registry@erlang-packages + uses: actions/checkout@v3 + with: + repository: rabbitmq/bazel-central-registry + path: bazel-central-registry + ref: erlang-packages + - name: PUBLISH TO rabbitmq/bazel-central-registry@erlang-packages + working-directory: bazel-central-registry + env: + MODULE_NAME: rabbitmq_ra + REPO_NAME: ra + run: | + VERSION="${{ github.ref_name }}" + VERSION="${VERSION#v}" + MAJOR="${VERSION:0:1}" + + echo "VERSION: ${VERSION}" + echo "MAJOR: ${MAJOR}" + + cat << EOF > ${MODULE_NAME}.json + { + "build_file": null, + "build_targets": [ + "@${MODULE_NAME}//:${MODULE_NAME}", + "@${MODULE_NAME}//:erlang_app" + ], + "compatibility_level": "$((${MAJOR} - 1))", + "deps": [], + "module_dot_bazel": "${{ github.workspace }}/${REPO_NAME}/MODULE.bazel", + "name": "${MODULE_NAME}", + "patch_strip": 0, + "patches": [], + "presubmit_yml": "${{ github.workspace }}/${REPO_NAME}/.bcr/presubmit.yml", + "strip_prefix": "${REPO_NAME}-${VERSION}", + "test_module_build_targets": [], + "test_module_path": null, + "test_module_test_targets": [], + "url": "https://github.com/${{ github.repository }}/releases/download/v${VERSION}/${REPO_NAME}-${VERSION}.tar.gz", + "version": "${VERSION}" + } + EOF + + jq '.' ${MODULE_NAME}.json + + bazel run //tools:add_module -- \ + --input=${MODULE_NAME}.json + + git diff + - name: CREATE PULL REQUEST + uses: peter-evans/create-pull-request@v5.0.2 + with: + token: ${{ secrets.REPO_SCOPED_TOKEN }} + path: bazel-central-registry + title: Add ${{ github.repository }}@${{ github.ref_name }} + body: > + Automated changes created by + ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} + using the [create-pull-request](https://github.com/peter-evans/create-pull-request) + GitHub action in the ${{ github.workflow }} workflow. + commit-message: | + Add ${{ github.repository }}@${{ github.ref_name }} + branch: add-${{ github.repository }}@${{ github.ref_name }} + delete-branch: true diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 2d58e313..3d84bf44 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -43,7 +43,6 @@ jobs: uses: ncipollo/release-action@v1.12.0 with: allowUpdates: true - draft: true artifactErrorsFailBuild: true updateOnlyUnreleased: true generateReleaseNotes: true From 6090f5fee129dc9ca2b9b97d0d2a9cb7a49a76b0 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 7 Sep 2023 16:29:07 +0400 Subject: [PATCH 09/14] Bump seshat dependency in rebar.config --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 95c22c9a..8df77ea5 100644 --- a/rebar.config +++ b/rebar.config @@ -1,7 +1,7 @@ {deps, [ {gen_batch_server, "0.8.8"}, {aten, "0.5.8"}, - {seshat, "0.4.0"} + {seshat, "0.6.0"} ]}. From cb5bc3c3c900cfc54a6fedb61c69237f5fa945cf Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 8 Sep 2023 13:51:26 -0500 Subject: [PATCH 10/14] Bump seshat to 0.6.0 in rebar.lock Despite the dependency at 0.6.0 in the rebar.config, rebar3 doesn't automatically upgrade rebar.lock or use the new dependency version until upgraded in the lockfile. This fixes local uses of rebar like running a cluster in a rebar3 shell. --- rebar.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rebar.lock b/rebar.lock index 9829e0b2..0751c069 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,14 +1,14 @@ {"1.2.0", [{<<"aten">>,{pkg,<<"aten">>,<<"0.5.8">>},0}, {<<"gen_batch_server">>,{pkg,<<"gen_batch_server">>,<<"0.8.8">>},0}, - {<<"seshat">>,{pkg,<<"seshat">>,<<"0.4.0">>},0}]}. + {<<"seshat">>,{pkg,<<"seshat">>,<<"0.6.0">>},0}]}. [ {pkg_hash,[ {<<"aten">>, <<"B5C97F48517C4F37F26A519AA57A00A31FF1B8EA4324EC1CAE27F818ED5C0DB2">>}, {<<"gen_batch_server">>, <<"7840A1FA63EE1EFFC83E8A91D22664847A2BA1192D30EAFFFD914ACB51578068">>}, - {<<"seshat">>, <<"1D5DC4294E36B8745245AB2649E24E39D7B6B1209D7A6484F2B8D706C35C9814">>}]}, + {<<"seshat">>, <<"3172EB1D7A2A4F66108CD6933A4E465AFF80F84AA90ED83F047B92F636123CCD">>}]}, {pkg_hash_ext,[ {<<"aten">>, <<"64D40A8CF0DDFEA4E13AF00B7327F0925147F83612D0627D9506CBFFE90C13EF">>}, {<<"gen_batch_server">>, <<"C3E6A1A2A0FB62AEE631A98CFA0FD8903E9562422CBF72043953E2FB1D203017">>}, - {<<"seshat">>, <<"2C3DEEC7FF86E0D0C05EDEBD3455C8363123C227BE292FFFFC1A05EEC08BFF63">>}]} + {<<"seshat">>, <<"7CEF700F92831DD7CAE6A6DD223CCC55AC88ECCE0631EE9AB0F2B5FB70E79B90">>}]} ]. From 50b3c332a961caa9492a589d5bb069c29c343332 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 12 Sep 2023 13:05:53 +0100 Subject: [PATCH 11/14] Commit down commands as low priority. This ensures that any pending low priority commands from the monitored process are committed before the down command so that state machines can rely on no further commands arriving from a given process. --- src/ra_server.erl | 7 ++--- test/ra_machine_int_SUITE.erl | 54 +++++++++++++++++++++++++++++++++++ test/ra_server_SUITE.erl | 8 +++--- 3 files changed, 61 insertions(+), 8 deletions(-) diff --git a/src/ra_server.erl b/src/ra_server.erl index e69bb923..4a67bff5 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -1874,10 +1874,9 @@ peer_snapshot_process_exited(SnapshotPid, #{cluster := Peers} = State) -> {ra_state(), ra_server_state(), effects()}. handle_down(leader, machine, Pid, Info, State) when is_pid(Pid) -> - %% commit command to be processed by state machine - handle_leader({command, {'$usr', #{ts => erlang:system_time(millisecond)}, - {down, Pid, Info}, noreply}}, - State); + % %% commit command to be processed by state machine + Eff = {next_event, {command, low, {'$usr', {down, Pid, Info}, noreply}}}, + {leader, State, [Eff]}; handle_down(RaftState, snapshot_sender, Pid, Info, #{cfg := #cfg{log_id = LogId}} = State) when (RaftState == leader orelse diff --git a/test/ra_machine_int_SUITE.erl b/test/ra_machine_int_SUITE.erl index 0b8940dc..798ef7d3 100644 --- a/test/ra_machine_int_SUITE.erl +++ b/test/ra_machine_int_SUITE.erl @@ -34,6 +34,7 @@ all_tests() -> send_msg_with_ra_event_and_cast_options, machine_replies, leader_monitors, + down_follows_all_low_priority_commands, follower_takes_over_monitor, deleted_cluster_emits_eol_effect, machine_state_enter_effects, @@ -222,6 +223,59 @@ leader_monitors(Config) -> ra:stop_server(?SYS, ServerId), ok. +down_follows_all_low_priority_commands(Config) -> + ClusterName = ?config(cluster_name, Config), + {_Name1, _} = ServerId1 = ?config(server_id, Config), + {_Name2, _} = ServerId2 = ?config(server_id2, Config), + {_Name3, _} = ServerId3 = ?config(server_id3, Config), + Cluster = [ServerId1, ServerId2, ServerId3], + Mod = ?config(modname, Config), + Self = self(), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun (_) -> [] end), + meck:expect(Mod, apply, + fun (_, {monitor_me, Pid}, State) -> + ct:pal("monitoring ~p", [Pid]), + {[Pid | State], ok, [{monitor, process, Pid}]}; + (_, {down, Pid, _}, State) -> + {lists:delete(Pid, State), ok, []}; + (_, {cmd, Pid}, State) -> + % ct:pal("handling ~p", [Cmd]), + case lists:member(Pid, State) of + true -> + {State, ok}; + false -> + {State, ok, [{send_msg, Self, {unexpected_cmd, Pid}}]} + end + end), + ok = start_cluster(ClusterName, {module, Mod, #{}}, Cluster), + %% send some commands then exit swiftly + spawn( + fun () -> + {ok, ok, L} = ra:process_command(ServerId1, {monitor_me, self()}), + [ra:pipeline_command(L, {cmd, self()}) || _ <- lists:seq(1, 200)], + Self ! done, + ok + end), + + receive + done -> + receive + {unexpected_cmd, _} -> + ct:fail("Unexpexted command after down") + after 2000 -> + ok + end + after 5000 -> + exit(done_Timeout) + end, + + + ra:stop_server(?SYS, ServerId1), + ra:stop_server(?SYS, ServerId2), + ra:stop_server(?SYS, ServerId3), + ok. + follower_takes_over_monitor(Config) -> ClusterName = ?config(cluster_name, Config), {_Name1, _} = ServerId1 = ?config(server_id, Config), diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index 45b09508..7b252bea 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -2440,15 +2440,15 @@ receive_snapshot_heartbeat_reply_dropped(_config) -> handle_down(_config) -> State0 = base_state(3, ?FUNCTION_NAME), - %% this should commit a command - {leader, #{log := Log} = State, _} = + %% this should return a next_event effect to commit a command + Pid = self(), + {leader, State, + [{next_event, {command, low, {'$usr', {down, Pid, noproc}, noreply}}}]} = ra_server:handle_down(leader, machine, self(), noproc, State0), - ?assertEqual({4, 5}, ra_log:last_index_term(Log)), %% this should be ignored as may happen if state machine doesn't demonitor %% on state changes {follower, State, []} = ra_server:handle_down(follower, machine, self(), noproc, State), - ok. set_peer_query_index(State, PeerId, QueryIndex) -> From 301ef2f727b05dc9778d8bc8af0994a0b983789a Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 12 Sep 2023 13:47:23 +0100 Subject: [PATCH 12/14] v2.7.0-pre.2 --- src/ra.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ra.app.src b/src/ra.app.src index acf09593..1b392d8c 100644 --- a/src/ra.app.src +++ b/src/ra.app.src @@ -1,6 +1,6 @@ {application,ra, [{description,"Raft library"}, - {vsn,"2.7.0-pre.1"}, + {vsn,"2.7.0-pre.2"}, {licenses,["Apache-2.0","MPL-2.0"]}, {links,[{"github","https://github.com/rabbitmq/ra"}]}, {modules,[]}, From b2bdf630b1cc7e4fc02c9c003935466622e6a992 Mon Sep 17 00:00:00 2001 From: Rin Kuryloski Date: Tue, 12 Sep 2023 15:04:59 +0200 Subject: [PATCH 13/14] Correct the "Release" actions workflow The version check requires erl to be available --- .github/workflows/release.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 3d84bf44..aa33f903 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -11,6 +11,10 @@ jobs: uses: actions/checkout@v3 with: path: ra + - name: CONFIGURE OTP & ELIXIR + uses: erlef/setup-beam@v1.16 + with: + otp-version: 26 - name: ASSERT VERSIONS id: versions working-directory: ra From ed4617937b14ed2b036334dd93da357592fd8397 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 12 Sep 2023 14:37:45 +0100 Subject: [PATCH 14/14] Update MODULE.bazel to 2.7.0-pre.2 --- MODULE.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MODULE.bazel b/MODULE.bazel index dabf401e..2e7f5708 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -1,6 +1,6 @@ module( name = "rabbitmq_ra", - version = "2.6.3", + version = "2.7.0-pre.2", ) bazel_dep(