diff --git a/src/ra.hrl b/src/ra.hrl index 14503d48..2d6856f5 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -253,6 +253,7 @@ -define(C_RA_SRV_AER_RECEIVED_FOLLOWER_EMPTY, ?C_RA_LOG_RESERVED + 17). -define(C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, ?C_RA_LOG_RESERVED + 18). -define(C_RA_SRV_LOCAL_QUERIES, ?C_RA_LOG_RESERVED + 19). +-define(C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, ?C_RA_LOG_RESERVED + 20). -define(RA_SRV_COUNTER_FIELDS, @@ -294,7 +295,9 @@ {term_and_voted_for_updates, ?C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, counter, "Total number of updates of term and voted for"}, {local_queries, ?C_RA_SRV_LOCAL_QUERIES, counter, - "Total number of local queries"} + "Total number of local queries"}, + {invalid_reply_mode_commands, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, counter, + "Total number of commands received with an invalid reply-mode"} ]). -define(RA_COUNTER_FIELDS, ?RA_LOG_COUNTER_FIELDS ++ ?RA_SRV_COUNTER_FIELDS). diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 90825689..72de91d1 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -353,33 +353,57 @@ leader(EventType, {local_call, Msg}, State) -> leader(EventType, {leader_cast, Msg}, State) -> leader(EventType, Msg, State); leader(EventType, {command, normal, {CmdType, Data, ReplyMode}}, - #state{server_state = ServerState0} = State0) -> - %% normal priority commands are written immediately - Cmd = make_command(CmdType, EventType, Data, ReplyMode), - {leader, ServerState, Effects} = - ra_server:handle_leader({command, Cmd}, ServerState0), - {State, Actions} = - ?HANDLE_EFFECTS(Effects, EventType, - State0#state{server_state = ServerState}), - {keep_state, State, Actions}; + #state{conf = Conf, server_state = ServerState0} = State0) -> + case validate_reply_mode(ReplyMode) of + ok -> + %% normal priority commands are written immediately + Cmd = make_command(CmdType, EventType, Data, ReplyMode), + {leader, ServerState, Effects} = + ra_server:handle_leader({command, Cmd}, ServerState0), + {State, Actions} = + ?HANDLE_EFFECTS(Effects, EventType, + State0#state{server_state = ServerState}), + {keep_state, State, Actions}; + Error -> + ok = incr_counter(Conf, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, 1), + case EventType of + {call, From} -> + {keep_state, State0, [{reply, From, Error}]}; + _ -> + {keep_state, State0, []} + end + end; leader(EventType, {command, low, {CmdType, Data, ReplyMode}}, - #state{delayed_commands = Delayed} = State0) -> - %% cache the low priority command until the flush_commands message arrives - - Cmd = make_command(CmdType, EventType, Data, ReplyMode), - %% if there are no prior delayed commands - %% (and thus no action queued to do so) - %% queue a state timeout to flush them - %% We use a cast to ourselves instead of a zero timeout as we want to - %% get onto the back of the erlang mailbox not just the current gen_statem - %% event buffer. - case queue:is_empty(Delayed) of - true -> - ok = gen_statem:cast(self(), flush_commands); - false -> - ok - end, - {keep_state, State0#state{delayed_commands = queue:in(Cmd, Delayed)}, []}; + #state{conf = Conf, delayed_commands = Delayed} = State0) -> + case validate_reply_mode(ReplyMode) of + ok -> + %% cache the low priority command until the flush_commands message + %% arrives + + Cmd = make_command(CmdType, EventType, Data, ReplyMode), + %% if there are no prior delayed commands + %% (and thus no action queued to do so) + %% queue a state timeout to flush them + %% We use a cast to ourselves instead of a zero timeout as we want + %% to get onto the back of the erlang mailbox not just the current + %% gen_statem event buffer. + case queue:is_empty(Delayed) of + true -> + ok = gen_statem:cast(self(), flush_commands); + false -> + ok + end, + State = State0#state{delayed_commands = queue:in(Cmd, Delayed)}, + {keep_state, State, []}; + Error -> + ok = incr_counter(Conf, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, 1), + case EventType of + {call, From} -> + {keep_state, State0, [{reply, From, Error}]}; + _ -> + {keep_state, State0, []} + end + end; leader(EventType, {aux_command, Cmd}, State0) -> {_, ServerState, Effects} = ra_server:handle_aux(?FUNCTION_NAME, EventType, Cmd, State0#state.server_state), @@ -1543,6 +1567,19 @@ read_chunks_and_send_rpc(RPC0, Res1 end. +validate_reply_mode(after_log_append) -> + ok; +validate_reply_mode(await_consensus) -> + ok; +validate_reply_mode({notify, Correlation, Pid}) + when (is_integer(Correlation) orelse is_reference(Correlation)) andalso + is_pid(Pid) -> + ok; +validate_reply_mode(noreply) -> + ok; +validate_reply_mode(ReplyMode) -> + {error, {invalid_reply_mode, ReplyMode}}. + make_command(Type, {call, From}, Data, Mode) -> Ts = erlang:system_time(millisecond), {Type, #{from => From, ts => Ts}, Data, Mode}; diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index ac67c9ff..813c4632 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -15,6 +15,10 @@ -define(PROCESS_COMMAND_TIMEOUT, 6000). -define(SYS, default). +%% The dialyzer catches that the given reply mode is not included in the +%% `ra_server:command_reply_mode()' type: +-dialyzer({nowarn_function, [process_command_with_unknown_reply_mode/1]}). + all() -> [ {group, tests} @@ -29,6 +33,7 @@ all_tests() -> start_servers, server_recovery, process_command, + process_command_with_unknown_reply_mode, pipeline_command, pipeline_command_reject, pipeline_command_2_forwards_to_leader, @@ -300,7 +305,19 @@ process_command(Config) -> [A, _B, _C] = Cluster = start_local_cluster(3, ?config(test_name, Config), {simple, fun erlang:'+'/2, 9}), - {ok, 14, _Leader} = ra:process_command(A, 5, ?PROCESS_COMMAND_TIMEOUT), + {ok, 14, _Leader} = ra:process_command(A, 5, ?PROCESS_COMMAND_TIMEOUT), + terminate_cluster(Cluster). + +process_command_with_unknown_reply_mode(Config) -> + [A, _B, _C] = Cluster = + start_local_cluster(3, ?config(test_name, Config), + {simple, fun erlang:'+'/2, 9}), + Command = 5, + ReplyMode = bad_reply_mode, + RaCommand = {'$usr', Command, ReplyMode}, + ?assertEqual({error, {invalid_reply_mode, ReplyMode}}, + ra_server_proc:command(A, RaCommand, + ?PROCESS_COMMAND_TIMEOUT)), terminate_cluster(Cluster). pipeline_command(Config) -> @@ -842,7 +859,7 @@ wait_for_gen_statem_status(Ref, ExpectedStatus, Timeout) case get_gen_statem_status(Ref) of ExpectedStatus -> ok; - OtherStatus when Timeout >= 0 -> + _OtherStatus when Timeout >= 0 -> timer:sleep(500), wait_for_gen_statem_status(Ref, ExpectedStatus, Timeout - 500); OtherStatus ->