Skip to content

Latest commit

 

History

History
220 lines (169 loc) · 7.75 KB

STATE_MACHINE_TUTORIAL.md

File metadata and controls

220 lines (169 loc) · 7.75 KB

Writing a State Machine

For ra to do anything useful you need to provide it with a state machine implementation that solves a particular problem.

To implement a state machine that will be replicated using Raft and ra, implement the ra_machine behaviour. There are two mandatory callbacks that need to be implemented:

-callback init(Conf :: machine_init_args()) -> state().

-callback 'apply'(command_meta_data(), command(), State) ->
    {State, reply(), effects() | effect()} | {State, reply()}.

init/1 returns the initial state when a new instance of the state machine is created. It takes an arbitrary map of configuration parameters.

apply/3 is the primary function that is called for every command in the raft log. It takes a meta data map containing the raft index and term (more on that later), a command and the current state and returns the new state, effects and a reply that can be returned to the caller if they issued a synchronous call (see: ra:process_command/2).

There are also some optional callbacks that advanced state machines may choose to implement.

A simple KV Store

This example builds a simple key-value store that supports write and read (or put and get) operations.

Writing the Store

Create a new erlang module named ra_kv using the ra_machine behaviour and export the init/1 and apply/3 functions:

-module(ra_kv).
-behaviour(ra_machine).
-export([init/1, apply/3]).

First we are going to define a type spec for the state and commands that we will use. The state is simply a map of arbitrary keys and values. We can store anything.

-opaque state() :: #{term() => term()}.

-type ra_kv_command() :: {write, Key :: term(), Value :: term()} |
                         {read, Key :: term()}.

To implement init/1 simply return an empty map as the initial state of our kv store.

init(_Config) -> #{}.

To implement the apply/3 function we need to handle each of the commands we support.

apply(_Meta, {write, Key, Value}, State) ->
    {maps:put(Key, Value, State), ok, _Effects = []};
apply(_Meta, {read, Key}, State) ->
    Reply = maps:get(Key, State, undefined),
    {State, Reply, _Effects = []}.

For the {write, Key, Value} command we simply put the key and value into the map and return the new state, pass through the list of effects and an ok return value.

For {read, Key} we additionally return the value of the key or undefined if it does not exist so that a waiting caller can obtain the value.

And that is it! The state machine is finished.

Running the state machine inside ra

To actually run this we need to configure a ra cluster to use the ra_kv state machine and start it. The simplest way is to use the ra:start_cluster/3 function. It takes a ClusterName that can be a binary, string or atom, a machine configuration and a list of servers that define the initial set of members.

start() ->
    %% the initial cluster members
    Members = [{ra_kv1, node()}, {ra_kv2, node()}, {ra_kv3, node()}],
    %% an arbitrary cluster name
    ClusterName = <<"ra_kv">>,
    %% the config passed to `init/1`, must be a `map`
    Config = #{},
    %% the machine configuration
    Machine = {module, ?MODULE, Config},
    %% ensure ra is started
    application:ensure_all_started(ra),
    %% start a cluster instance running the `ra_kv` machine
    ra:start_cluster(ClusterName, Machine, Members).

If you then start an erlang shell with make shell or similar and call ra_kv:start/0 you should hopefully be returned with something like:

{ok,[{ra_kv3,nonode@nohost},
     {ra_kv2,nonode@nohost},
     {ra_kv1,nonode@nohost}],
    []}

Indicating that all servers in the ra cluster were successfully started. The last element of the tuple would contain the servers that were not successfully started. If a quorum of servers could not be started the function would return and error.

Now you can write your first value into the cluster.

2> ra:process_command(ra_kv1, {write, k, v}).
{ok, ok, {ra_kv1,nonode@nohost}}
3> ra:process_command(ra_kv1, {read, k}).
{ok, v, {ra_kv1,nonode@nohost}}
4> ra:process_command(ra_kv1, {write, k, v2}).
{ok, ok, {ra_kv1,nonode@nohost}}
5> ra:process_command(ra_kv1, {read, k}).
{ok, v2, {ra_kv1,nonode@nohost}}

ra:process_command/2 blocks until the command has achieved consensus and has been applied to the state machine on the leader server. It is the simplest way to interact with ra but also the one with the highest latency. To read values consistently we have no choice other than to use it. The return tuple has either the raft index and term the entry was added to the raft log or the return value optionally returned by the state machine. The {read, Key} command returns the current value of the key.

Providing a client API

We have already added the start/0 function to start a local ra cluster. It would make sense to abstract interactions with the kv store behind a nicer interface than calling ra:process_command/2 directly.

write(Key, Value) ->
    %% it would make sense to cache this to avoid redirection costs when this
    %% server happens not to be the current leader
    Server = ra_kv1,
    case ra:process_command(Server, {write, Key, Value}) of
        {ok, _, _} ->
            ok;
        Err ->
            Err
    end.

read(Key) ->
    Server = ra_kv1,
    case ra:process_command(Server, {read, Key}) of
        {ok, Value, _} ->
            {ok, Value};
        Err ->
            Err
    end.

Effects

Effects are used to separate the state machine logic from the side effects it wants to take inside it's environment. Each call to the apply/3 function can return a list of effects for the leader to realise. This includes sending messages, setting up server and process monitors and calling arbitrary functions.

Effects should be a list sorted by execution order, i.e. the effect to be actioned first should be at the head of the list.

Only the leader that first applies an entry will attempt the effect. Followers process the same set of commands but simply throw away any effects returned by the state machine.

Send a message

The {send_msg, pid(), Msg :: term()} effect asynchronously sends a message to the specified pid. Note that ra uses erlang:send/3 with the no_connect and no_suspend options which are the least reliable message sending options. It does this so that a state machine send_msg effect will never block the main ra process. To ensure message reliability normal Autmatic Repeat Query (ARQ) like protocols between the state machine and the receiver should be implemented if needed.

Monitors

Use {monitor, process | node, pid() | node()} to ask the ra leader to monitor a process or node. If ra receives a DOWN for a process it is monitoring it will commit a {down, pid(), term()} command to the log that the state machine needs to handle. If it detects a monitored node as down or up it will commit a {nodeup | nodedown, node()} command.

Use {demonitor, process | node, pid() | node()} to stop monitoring a process or a node.

Call a function

Use the {mod_call, module(), function(), Args :: [term()]} to call an arbitrary function. Care need to be taken not to block the ra process whilst doing so. It is recommended that expensive operations are done in another process.

The mod_call effect is useful for e.g. updating an ets table of committed entries or similar.

Update the release cursor (Snapshotting)

To (potentially) trigger a snapshot return the {release_cursor, RaftIndex, MachineState} effect. This is why the raft index is included in the apply/3 function. Ra will only create a snapshot if doing so will result in log segments being deleted.