Skip to content

Commit

Permalink
Merge pull request #320 from rabbitmq/md-validate-reply-mode
Browse files Browse the repository at this point in the history
Validate command reply modes
  • Loading branch information
michaelklishin authored Oct 19, 2022
2 parents 64b687a + be203b9 commit 17c0068
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 29 deletions.
5 changes: 4 additions & 1 deletion src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@
-define(C_RA_SRV_AER_RECEIVED_FOLLOWER_EMPTY, ?C_RA_LOG_RESERVED + 17).
-define(C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, ?C_RA_LOG_RESERVED + 18).
-define(C_RA_SRV_LOCAL_QUERIES, ?C_RA_LOG_RESERVED + 19).
-define(C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, ?C_RA_LOG_RESERVED + 20).


-define(RA_SRV_COUNTER_FIELDS,
Expand Down Expand Up @@ -294,7 +295,9 @@
{term_and_voted_for_updates, ?C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, counter,
"Total number of updates of term and voted for"},
{local_queries, ?C_RA_SRV_LOCAL_QUERIES, counter,
"Total number of local queries"}
"Total number of local queries"},
{invalid_reply_mode_commands, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, counter,
"Total number of commands received with an invalid reply-mode"}
]).

-define(RA_COUNTER_FIELDS, ?RA_LOG_COUNTER_FIELDS ++ ?RA_SRV_COUNTER_FIELDS).
89 changes: 63 additions & 26 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -353,33 +353,57 @@ leader(EventType, {local_call, Msg}, State) ->
leader(EventType, {leader_cast, Msg}, State) ->
leader(EventType, Msg, State);
leader(EventType, {command, normal, {CmdType, Data, ReplyMode}},
#state{server_state = ServerState0} = State0) ->
%% normal priority commands are written immediately
Cmd = make_command(CmdType, EventType, Data, ReplyMode),
{leader, ServerState, Effects} =
ra_server:handle_leader({command, Cmd}, ServerState0),
{State, Actions} =
?HANDLE_EFFECTS(Effects, EventType,
State0#state{server_state = ServerState}),
{keep_state, State, Actions};
#state{conf = Conf, server_state = ServerState0} = State0) ->
case validate_reply_mode(ReplyMode) of
ok ->
%% normal priority commands are written immediately
Cmd = make_command(CmdType, EventType, Data, ReplyMode),
{leader, ServerState, Effects} =
ra_server:handle_leader({command, Cmd}, ServerState0),
{State, Actions} =
?HANDLE_EFFECTS(Effects, EventType,
State0#state{server_state = ServerState}),
{keep_state, State, Actions};
Error ->
ok = incr_counter(Conf, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, 1),
case EventType of
{call, From} ->
{keep_state, State0, [{reply, From, Error}]};
_ ->
{keep_state, State0, []}
end
end;
leader(EventType, {command, low, {CmdType, Data, ReplyMode}},
#state{delayed_commands = Delayed} = State0) ->
%% cache the low priority command until the flush_commands message arrives

Cmd = make_command(CmdType, EventType, Data, ReplyMode),
%% if there are no prior delayed commands
%% (and thus no action queued to do so)
%% queue a state timeout to flush them
%% We use a cast to ourselves instead of a zero timeout as we want to
%% get onto the back of the erlang mailbox not just the current gen_statem
%% event buffer.
case queue:is_empty(Delayed) of
true ->
ok = gen_statem:cast(self(), flush_commands);
false ->
ok
end,
{keep_state, State0#state{delayed_commands = queue:in(Cmd, Delayed)}, []};
#state{conf = Conf, delayed_commands = Delayed} = State0) ->
case validate_reply_mode(ReplyMode) of
ok ->
%% cache the low priority command until the flush_commands message
%% arrives

Cmd = make_command(CmdType, EventType, Data, ReplyMode),
%% if there are no prior delayed commands
%% (and thus no action queued to do so)
%% queue a state timeout to flush them
%% We use a cast to ourselves instead of a zero timeout as we want
%% to get onto the back of the erlang mailbox not just the current
%% gen_statem event buffer.
case queue:is_empty(Delayed) of
true ->
ok = gen_statem:cast(self(), flush_commands);
false ->
ok
end,
State = State0#state{delayed_commands = queue:in(Cmd, Delayed)},
{keep_state, State, []};
Error ->
ok = incr_counter(Conf, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, 1),
case EventType of
{call, From} ->
{keep_state, State0, [{reply, From, Error}]};
_ ->
{keep_state, State0, []}
end
end;
leader(EventType, {aux_command, Cmd}, State0) ->
{_, ServerState, Effects} = ra_server:handle_aux(?FUNCTION_NAME, EventType,
Cmd, State0#state.server_state),
Expand Down Expand Up @@ -1543,6 +1567,19 @@ read_chunks_and_send_rpc(RPC0,
Res1
end.

validate_reply_mode(after_log_append) ->
ok;
validate_reply_mode(await_consensus) ->
ok;
validate_reply_mode({notify, Correlation, Pid})
when (is_integer(Correlation) orelse is_reference(Correlation)) andalso
is_pid(Pid) ->
ok;
validate_reply_mode(noreply) ->
ok;
validate_reply_mode(ReplyMode) ->
{error, {invalid_reply_mode, ReplyMode}}.

make_command(Type, {call, From}, Data, Mode) ->
Ts = erlang:system_time(millisecond),
{Type, #{from => From, ts => Ts}, Data, Mode};
Expand Down
21 changes: 19 additions & 2 deletions test/ra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
-define(PROCESS_COMMAND_TIMEOUT, 6000).
-define(SYS, default).

%% The dialyzer catches that the given reply mode is not included in the
%% `ra_server:command_reply_mode()' type:
-dialyzer({nowarn_function, [process_command_with_unknown_reply_mode/1]}).

all() ->
[
{group, tests}
Expand All @@ -29,6 +33,7 @@ all_tests() ->
start_servers,
server_recovery,
process_command,
process_command_with_unknown_reply_mode,
pipeline_command,
pipeline_command_reject,
pipeline_command_2_forwards_to_leader,
Expand Down Expand Up @@ -300,7 +305,19 @@ process_command(Config) ->
[A, _B, _C] = Cluster =
start_local_cluster(3, ?config(test_name, Config),
{simple, fun erlang:'+'/2, 9}),
{ok, 14, _Leader} = ra:process_command(A, 5, ?PROCESS_COMMAND_TIMEOUT),
{ok, 14, _Leader} = ra:process_command(A, 5, ?PROCESS_COMMAND_TIMEOUT),
terminate_cluster(Cluster).

process_command_with_unknown_reply_mode(Config) ->
[A, _B, _C] = Cluster =
start_local_cluster(3, ?config(test_name, Config),
{simple, fun erlang:'+'/2, 9}),
Command = 5,
ReplyMode = bad_reply_mode,
RaCommand = {'$usr', Command, ReplyMode},
?assertEqual({error, {invalid_reply_mode, ReplyMode}},
ra_server_proc:command(A, RaCommand,
?PROCESS_COMMAND_TIMEOUT)),
terminate_cluster(Cluster).

pipeline_command(Config) ->
Expand Down Expand Up @@ -842,7 +859,7 @@ wait_for_gen_statem_status(Ref, ExpectedStatus, Timeout)
case get_gen_statem_status(Ref) of
ExpectedStatus ->
ok;
OtherStatus when Timeout >= 0 ->
_OtherStatus when Timeout >= 0 ->
timer:sleep(500),
wait_for_gen_statem_status(Ref, ExpectedStatus, Timeout - 500);
OtherStatus ->
Expand Down

0 comments on commit 17c0068

Please sign in to comment.