Skip to content

Commit

Permalink
Validate command reply modes
Browse files Browse the repository at this point in the history
`ra_server:command_reply_mode()` was not previously validated on the
`ra_server`-side. If a command were sent with an unknown reply mode,
the Ra server would never reply or notify the caller.

Sending an unknown reply mode might happen when a client sends a new
command to a Ra server which is operating on an old version of Ra.

This change validates reply modes within the `ra_server`, replying with
an error tuple when an invalid reply mode is passed.
  • Loading branch information
the-mikedavis committed Oct 19, 2022
1 parent 0b8d998 commit 20ba89c
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 24 deletions.
83 changes: 59 additions & 24 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -354,32 +354,54 @@ leader(EventType, {leader_cast, Msg}, State) ->
leader(EventType, Msg, State);
leader(EventType, {command, normal, {CmdType, Data, ReplyMode}},
#state{server_state = ServerState0} = State0) ->
%% normal priority commands are written immediately
Cmd = make_command(CmdType, EventType, Data, ReplyMode),
{leader, ServerState, Effects} =
ra_server:handle_leader({command, Cmd}, ServerState0),
{State, Actions} =
?HANDLE_EFFECTS(Effects, EventType,
State0#state{server_state = ServerState}),
{keep_state, State, Actions};
case validate_reply_mode(ReplyMode) of
ok ->
%% normal priority commands are written immediately
Cmd = make_command(CmdType, EventType, Data, ReplyMode),
{leader, ServerState, Effects} =
ra_server:handle_leader({command, Cmd}, ServerState0),
{State, Actions} =
?HANDLE_EFFECTS(Effects, EventType,
State0#state{server_state = ServerState}),
{keep_state, State, Actions};
Error ->
case EventType of
{call, From} ->
{keep_state, State0, [{reply, From, Error}]};
_ ->
{keep_state, State0, []}
end
end;
leader(EventType, {command, low, {CmdType, Data, ReplyMode}},
#state{delayed_commands = Delayed} = State0) ->
%% cache the low priority command until the flush_commands message arrives

Cmd = make_command(CmdType, EventType, Data, ReplyMode),
%% if there are no prior delayed commands
%% (and thus no action queued to do so)
%% queue a state timeout to flush them
%% We use a cast to ourselves instead of a zero timeout as we want to
%% get onto the back of the erlang mailbox not just the current gen_statem
%% event buffer.
case queue:is_empty(Delayed) of
true ->
ok = gen_statem:cast(self(), flush_commands);
false ->
ok
end,
{keep_state, State0#state{delayed_commands = queue:in(Cmd, Delayed)}, []};
case validate_reply_mode(ReplyMode) of
ok ->
%% cache the low priority command until the flush_commands message
%% arrives

Cmd = make_command(CmdType, EventType, Data, ReplyMode),
%% if there are no prior delayed commands
%% (and thus no action queued to do so)
%% queue a state timeout to flush them
%% We use a cast to ourselves instead of a zero timeout as we want
%% to get onto the back of the erlang mailbox not just the current
%% gen_statem event buffer.
case queue:is_empty(Delayed) of
true ->
ok = gen_statem:cast(self(), flush_commands);
false ->
ok
end,
State = State0#state{delayed_commands = queue:in(Cmd, Delayed)},
{keep_state, State, []};
Error ->
case EventType of
{call, From} ->
{keep_state, State0, [{reply, From, Error}]};
_ ->
{keep_state, State0, []}
end
end;
leader(EventType, {aux_command, Cmd}, State0) ->
{_, ServerState, Effects} = ra_server:handle_aux(?FUNCTION_NAME, EventType,
Cmd, State0#state.server_state),
Expand Down Expand Up @@ -1543,6 +1565,19 @@ read_chunks_and_send_rpc(RPC0,
Res1
end.

validate_reply_mode(after_log_append) ->
ok;
validate_reply_mode(await_consensus) ->
ok;
validate_reply_mode({notify, Correlation, Pid})
when (is_integer(Correlation) orelse is_reference(Correlation)) andalso
is_pid(Pid) ->
ok;
validate_reply_mode(noreply) ->
ok;
validate_reply_mode(ReplyMode) ->
{error, {invalid_reply_mode, ReplyMode}}.

make_command(Type, {call, From}, Data, Mode) ->
Ts = erlang:system_time(millisecond),
{Type, #{from => From, ts => Ts}, Data, Mode};
Expand Down
17 changes: 17 additions & 0 deletions test/ra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
-define(PROCESS_COMMAND_TIMEOUT, 6000).
-define(SYS, default).

%% The dialyzer catches that the given reply mode is not included in the
%% `ra_server:command_reply_mode()' type:
-dialyzer({nowarn_function, [process_command_with_unknown_reply_mode/1]}).

all() ->
[
{group, tests}
Expand All @@ -29,6 +33,7 @@ all_tests() ->
start_servers,
server_recovery,
process_command,
process_command_with_unknown_reply_mode,
pipeline_command,
pipeline_command_reject,
pipeline_command_2_forwards_to_leader,
Expand Down Expand Up @@ -303,6 +308,18 @@ process_command(Config) ->
{ok, 14, _Leader} = ra:process_command(A, 5, ?PROCESS_COMMAND_TIMEOUT),
terminate_cluster(Cluster).

process_command_with_unknown_reply_mode(Config) ->
[A, _B, _C] = Cluster =
start_local_cluster(3, ?config(test_name, Config),
{simple, fun erlang:'+'/2, 9}),
Command = 5,
ReplyMode = bad_reply_mode,
RaCommand = {'$usr', Command, ReplyMode},
?assertEqual({error, {invalid_reply_mode, ReplyMode}},
ra_server_proc:command(A, RaCommand,
?PROCESS_COMMAND_TIMEOUT)),
terminate_cluster(Cluster).

pipeline_command(Config) ->
[A, _B, _C] = Cluster =
start_local_cluster(3, ?config(test_name, Config),
Expand Down

0 comments on commit 20ba89c

Please sign in to comment.