This guide covers key implementation aspects of Ra. It is intentionally high level since implementation details change often but overall design usually doesn't (or the authors believe Ra is well past that point).
Ra is an implementation of Raft, a distributed consensus protocol. Raft has multiple features that work together. They are reflected in the Ra API.
Ra assumes that there can (and usually will be) multiple Raft clusters in a given Erlang node cluster. In case of a messaging system, a topic or queue can be its own cluster. In a data store a data partition can be its own Raft cluster. Any long lived stateful entity that the user would like to replicate across cluster nodes can use a Raft cluster.
Raft clusters in a single cluster are logically independent but do share some Ra infrastructure such as the write-ahead log. This is a practical decision that avoids a lot of concurrent fsync operations that very significantly affect system throughput.
Ra API has two main modules:
ra
: handles cluster formation, membership and functions that are not directly related to Ra state machinera_machine
: a behavior that applications implement
Applications interact with Ra clusters by sending commands. The commands are processed by the state machine. In response for every command a new machine state is returned.
The Ra server takes care of Raft leader election, log entry replication, peer failure handling, log reinstallation, and so on. Applications are responsible for expressing their logic in terms of Ra commands and effects.
There are two mandatory ra_machine
callbacks that need to be
implemented:
-callback init(Conf :: machine_init_args()) -> state().
-callback 'apply'(command_meta_data(), command(), State) ->
{State, reply(), effects() | effect()} | {State, reply()}.
init/1
returns the initial state when a new instance of the state machine
is created. It takes an arbitrary map of configuration parameters. The parameters
are application-specific.
apply/3
is the primary function that is called for every command in the
Raft log. It takes the Raft state machine metadata as a map (most importantly the Raft index and current term),
a command and the current state.
It must return the new state, a list of effects (in execution order) and a reply.
Replies will be returned to the caller
only if they performed a synchronous call or they requested an asynchronous notification
using ra:pipeline_command/{3/4}
.
-type effects() :: [effect()].
-type reply() :: term().
-type command() :: user_command() | builtin_command().
-type command_meta_data() :: ra_server:command_meta() | #{index := ra_index(),
term := ra_term()}.
-spec apply(machine(), command_meta_data(), command(), State) ->
{State, reply(), effects() | effect()} | {State, reply()}.
Effects are used to separate the state machine logic from the side effects it wants
to take inside its environment. Each call to the apply/3
function can return
a list of effects for the leader to realise. This includes sending messages,
setting up server and process monitors and calling arbitrary functions.
In other words, effects are everything that's not related to Raft state machine transitions.
Under normal operation only the leader that first applies an entry will attempt the effect. Followers process the same set of commands but simply throw away any effects returned by the state machine.
To ensure we do not re-issue effects on recovery each ra
server persists its last_applied
index.
When the server restarts it replays its log until this point and throws away any resulting effects as they
should already have been issued.
As the last_applied
index is only persisted periodically there is a small
chance that some effects may be issued multiple times when all the servers in the
cluster fail at the same time. There is also a chance that effects will
never be issued or reach their recipients. Ra makes no allowance for this.
It is worth taking this into account when implementing a state machine.
The Automatic Repeat Query (ARQ) protocol can be used to implement reliable communication (Erlang message delivery) given the above limitations.
A number of effects are available to the user.
The {send_msg, pid(), Msg :: term()}
effect asynchronously sends a message
to the specified pid
.
ra
uses erlang:send/3
with the no_connect
and no_suspend
options which is the least reliable way of doing it. It does this so
that a state machine send_msg
effect will never block the main ra
process.
To ensure message reliability, Automatic Repeat Query (ARQ)-like protocols between the state machine and the receiver should be implemented if needed.
The {monitor, process | node, pid() | node()}
effect will ask the ra
leader to
monitor a process or node. If ra
receives a DOWN
for a process it
is monitoring it will commit a {down, pid(), term()}
command to the log that
the state machine needs to handle. If it detects a monitored node as down or up
it will commit a {nodeup | nodedown, node()}
command to the log.
Use {demonitor, process | node, pid() | node()}
to stop monitoring a process
or a node.
All monitors are invalidated when the leader changes. State machines should
re-issue monitor effects when becoming leader using the state_enter/2
callback.
The {mod_call, module(), function(), Args :: [term()]}
effect will ask the leader
to call an arbitrary function. Care need to be taken not to block the ra
process whilst doing so.
It is recommended that expensive operations are done in another process.
The mod_call
effect is useful for e.g. updating an ETS table of committed entries
or similar.
The {timer, Name :: term(), Time :: non_neg_integer() | infinity}
effects asks the Ra leader
to maintain a timer on behalf of the state machine and commit a timeout
command
when the timer triggers. If setting the time to infinity
, the timer will not be started
and any running timer with same name will be cancelled.
The timer is relative and setting another timer with the same name before the current timer runs out results in the current timer being reset.
All timers are invalidated when the leader changes. State machines should
re-issue timer effects when becoming leader using the state_enter/2
callback.
Use {log, Indexes :: [ra_index()], fun(([user_command()]) -> effects()}
to read
commands from the log from the specified indexes and return a list of effects.
Effectively this effect transforms log entries into effects.
Potential use cases could be when a command contains large binary data and you don't want to keep this in memory but load it on demand when needed for a side-effect.
This is an advanced feature and will only work as long as the command is still
in the log. If a release_cursor
has been emitted with an index higher than this,
the command may no longer be in the log and the function will not be called.
There is currently no facility for reading partial data from a snapshot.
The {release_cursor, RaftIndex, MachineState}
effect can be used to give Ra cluster members a hint to trigger a snapshot.
This effect, when emitted, is evaluated on all nodes and not just the leader.
It is not guaranteed that a snapshot will be taken. A decision to take a snapshot or to delay it is taken using a number of internal Ra state factors. The goal is to minimise disk I/O activity when possible.
It is eventually necessary to make changes to the state machine code. Any changes to a state machine that would result in a different end state when the state is re-calculated from the log of entries (as is done when restarting a ra server) should be considered breaking.
As Ra state machines need to be deterministic any changes to the logic inside the apply/3
function
needs to be enabled at the same index on all members of a Ra cluster.
Ra considers all state machines versioned starting with version 0. State machines
that need to be updated with breaking changes need to implement the optional
versioning parts of the ra_machine
behaviour:
-type version() :: non_neg_integer().
-callback version() -> pos_integer().
-callback which_module(version()) -> module().
version/0
returns the current version which is an integer that is
higher than any previously used version number. Whenever a breaking change is
made this should be incremented.
which_module/1
maps a version to the module implementing it. This allows
developers to optionally keep entire modules for old versions instead of trying
to handle multiple versions in the same module.
E.g. when moving from version 0 of my_machine
to version 1:
-
Copy and rename the
my_machine
module tomy_machine_v0
-
Implement the breaking changes in the original module and bump the version.
version() -> 1.
which_module(1) -> my_machine;
which_module(0) -> my_machine_v0.
This would ensure that any entries added to the log are applied against the active machine version at the time they were added, leading to a deterministic outcome.
For smaller (but still breaking) changes that can be handled in the original
module it is also possible to switch based on the machine_version
key included in the meta
data passed to apply/3
.
New versions are enabled whenever there is a quorum of members with a higher version and one of them is elected leader. The leader will commit the new version to the log and each follower will move to the new version when this log entry is applied. Followers that do not yet have the new version available will receive log entries from the leader and update their logs but will not apply log entries. When they are upgraded and have the new version, all outstanding log entries will be applied. In practical terms this means that Ra nodes can be upgraded one by one.
In order to be upgradeable, the state machine implementation will need to handle the version
bump in the form of a command that is passed to the apply/3
callback:
{machine_version, OldVersion, NewVersion}
. This provides an
opportunity to transform the state data into a new form, if needed. Note that the version
bump may be for several versions so it may be necessary to handle multiple
state transformations.
Ra does not support the Erlang hot code swapping mechanism.
There are two approaches to forming a cluster:
- All cluster members can be known ahead of time
- All cluster members can be joining existing members dynamically (this implies that one "seed" member is chosen and started first)
Use ra:start_or_restart_cluster/3
on one of the nodes to set up a cluster.
This will either create a new cluster or restart an existing one.
As cluster membership is persisted in Ra logs, newly added nodes will be discovered from the log.
With this approach there has to be a "preconfigured" or "seed" nodes that other nodes will join. Nodes that join the seed later will discard all their state before joining.
Ra API supports two options: the joining node can either have existing state
(in which case it has to be discarded with ra:force_delete_server/1
) or be completely fresh nodes.
Start a local server with ra:start_server/1
, then call ra:add_member/2
on an existing cluster member.
The newly started node joins the existing cluster as a follower and replicates the existing log,
after which it will be a regular member of the cluster.
Since Ra assumes multiple clusters running side by side in a given Erlang cluster, each cluster and cluster member must have their own identities.
A cluster name in Ra is defined as binary() | string() | atom()
.
The cluster name is mostly a "human-friendly" name for a Ra cluster. Something that identifies the entity the cluster is meant to represent. The cluster name isn't strictly part of a cluster's identity.
For example, in RabbitMQ's quorum queues case cluster names are derived from queue's identity.
A Ra server is a Ra cluster member. Server ID is defined as a pair of {atom(), node()}
.
Server ID combines a locally registered name and the Erlang node it resides on.
Since server IDs identify Ra cluster members, they need to be a
persistent addressable value that can be used as a target for sending
messages. A pid()
would not work as it isn't persisted across
process restarts.
While it's more common for each server within a Ra cluster to
be started on a separate Erlang node, ra
also supports the scenario
where multiple Ra servers within a cluster are placed on the same
Erlang node.
Ra servers are locally registered process to avoid depending on a
distributed registry (such as the global
module) which does not
provide the same consistency and recovery guarantees.
Each Ra server also needs an ID that is unique to the local Erlang
node and unique across incarnations of ra
clusters with the same
cluster ID.
This is to handle the case where a Ra cluster with the same name is deleted and then re-created with the same cluster id and server ids shortly after. In this instance the write-ahead log may contain entries from the previous incarnation which means we could be mixing entries written in the previous incarnation with ones written in the current incarnation which obviously is unacceptable. Hence providing a unique local identity is critical for correct operation.
The IDs are used to identify a Ra server's data on disk and must be filename-safe and conform to the base64url standard.
Ra cluster members also use IDs that are unique to that member. The IDs assume Base64 URI-encoded binaries that can be safely used as directory and file name values on disk.
When ra:start_server/4
or ra:start_cluster/3
are invoked, an UID
is automatically generated by Ra.
This is used for interactions with the write-ahead log,
segment and snapshot writer processes who use the ra_directory
module to
lookup the current pid()
for a given ID.
A UID can be user-provided. Values must conform to the base64url standard.
Here's an example of a compliant user-provided UID:
Config = #{cluster_name => <<"ra-cluster-1">>,
server_id => {ra_cluster_1, ra1@snowman},
uid => <<"ra_cluster_1_1519808362841">>
...},
Ra implements the single server cluster membership change strategy covered in Consensus: Bridging Theory and Practice. In practice that means that join and leave requests are processed sequentially one by one.
The functions that manage cluster members, ra:add_member/2
and ra:remove_member/2
, respectively,
will return as soon as the membership change state transition has been written to the log and the leader
has switched to the new configuration.
To interact with a Ra cluster, client processes need to first discover and subsequently track the leader of the cluster.
Synchronous commands such as ra:process_command/2
, ra:members/1
always redirect
to the current leader (if known) and returns the leader's server id as
as the 3rd item in success response. The client process can then use this return
value to avoid the redirection overhead for future calls.
When using the asynchronous API such as ra:pipeline_command/2|3|4
it is best if
the client first uses a synchronous call such as ra:members/1
to discover the current
leader and direct asynchronous commands to this member.
Alternatively clients can use the ra_leaderboard:lookup_leader/2
function to lookup
the locally known current leader of the cluster by its cluster name. If there is
no local member of the Ra cluster or the leader is not known this function returns
undefined
. In this case the client process should fall back to synchronous discovery.
Ra aims to fit well within the Erlang environment as well as provide good adaptive throughput. Therefore it has deviated from the original Raft protocol in certain areas.
Log replication in Ra is mostly asynchronous, so there is no actual use of RPC (as in the Raft paper) calls. New entries are pipelined and followers reply after receiving a written event which incurs a natural batching effect on the replies.
Followers include 3 non-standard fields in their Raft AppendEntries
RPC replies:
-
last_index
,last_term
- the index and term of the last fully written entry. The leader uses these to calculate the newcommit_index
. -
next_index
- this is the next index the follower expects. For successful replies it is not set, or is ignored by the leader. It is set for unsuccessful replies and is used by the leader to update itsnext_index
for the follower and resend entries from this point.
To avoid completely overwhelming a slow follower the leader will only
pipeline if the difference between the next_index
and match_index
is
below some limit (currently set to 1000).
Follower that are considered stale (i.e. the match_index is less then next_index - 1) are still sent an append entries message periodically, although less frequently than recommended in the Raft paper. This is done to ensure follower liveness. In an idle system where all followers are in sync no further messages will be sent to reduce network bandwidth usage.
Ra doesn't rely on the Raft paper's approach to peer failure detection where the leader periodically sends append entries messages to enforce its leadership.
Ra is designed to support potentially thousands of concurrently running clusters within an Erlang cluster and having all these doing their own failure detection has proven excessive in terms of bandwidth usage.
In Ra, leaders will not send append entries unless there is an update to be sent. It means followers don't (typically) set election timers.
This leaves the question on how failures are detected and elections are triggered.
Ra tries to make use as much of native Erlang failure detection facilities as it can. Process or node failure scenario are handled using Erlang monitors. Followers monitor the currently elected leader and if they receive a 'DOWN' message as they would in the case of a crash or sustained network partition where Erlang distribution detects a node isn't replying, the follower then sets a short, randomised election timeout.
This only works well in crash-stop scenarios. For network partition scenarios Ra could rely on Erlang distribution's net ticks mechanism to detect the partition but this could easily take 30-60 seconds by default to happen which is too slow.
This is why the ra
application uses Aten, a separate node failure
detection library developed alongside it.
Aten monitors Erlang nodes. When it suspects an Erlang node is down
it notifies local ra
servers of this. If this Erlang node hosts the currently
known ra
leader the follower will start an election.
Ra implements a "pre-vote" member state that sits between the "follower" and "candidate" states. This avoids cluster disruptions due to leader failure false positives.
Ra is designed to support potentially thousands of Ra servers per Erlang node.
As all data needs to be safely persisted and fsync-ed to disk before it can be
actioned it is not practical or performant to have each server write log entries
to their own log files. Ra's log design instead is optimised to avoid
parallel calls to fsync(1)
and thus need to funnel all log entry writes
through a common component, named the write-ahead log
(although strictly speaking there is no "ahead" in this sense).
All messages are sent asynchronously to maximise throughput with ARQ-like protocols where needed.
There are several processes involved in the Ra log implementation.
The Ra server initiates all write operations and coordinates all notifications received from the other processes. It is usually the only process that reads from the log, even if it is possible to attach external readers to the log.
Under normal operations reads from the log are done from in-memory ETS tables but it may occasionally read from log segments on disk, particularly when recovering after a restart. These ETS tables are maintained by the write-ahead log component.
This is the process that's first to accept all write operations.
The storage of the WAL is made of 2 parts:
- a file where the entries of all Ra servers are written.
- in-memory ETS tables that contain and will serve the entries for individual Ra servers.
The WAL appends entries from all Ra servers running on the current Erlang node to a single file
and calls fsync(1)
after a certain number of writes have been performed or there are no further
write operations pending in its mailbox.
After each batch it notifies each Ra server that had a write in that batch so that it can consider the entry written.
The WAL tries to adapt the load by dynamically increasing the max number of writes per batch, trading latency for throughput.
Depending on the system a call to fsync(1)
can block for several milliseconds.
Typically the size of a WAL batch is tuned to the number of messages received
during the fsync(1)
call. The WAL then quickly writes all pending entries to
disk, calls fsync(1)
which will accumulate further writes and so on.
The WAL file is optimised for writing because it is only read on recovery (when the system has stopped or crashed and the last known state of Ra servers need to be restored.)
The WAL writes each entry to a per-Ra-server ETS table (similar to Cassandra and RocksDB MemTables, see Log Structured Storage) which the Ra server uses to read data from its log.
So the ETS tables are a way to access entries, but they are not the only place where entries are stored though (more on this later).
The ETS tables are typically used by Ra servers once entries have been replicated and got consensus. Ra servers look up confirmed entries with their index and apply them to their state machine.
Ra servers maintain also a short-lived cache of their entries which can be used whenever an entry needs to be looked up. This cache is pruned when an entry is confirmed by the WAL (meaning, fully written to disk). Depending on the timing, an entry can be looked up from this cache if it is there (the fastest) or from an ETS table. An example of the usage of this cache is when the leader gets a consensus for an entry before the confirmation of its own WAL, the entry is looked up from the cache and applied to the state machine. We'll see later that entries can be stored in yet another place, segment files. So whenever an entry is needed, Ra will go through these 3 storage mechanisms to look it up: cache, ETS tables, segment files.
To avoid perpetually appending to an ever growing file the WAL periodically "rolls over" to a new file. The old file and the mem tables written during the lifetime of that file are passed to the segment writer process that is responsible for flushing the mem tables to per-Ra-server-specific on-disk storage. The data is stored in variably sized segments with a fixed number of log entries.
When the segment writer has finished flushing all the ETS tables for all the Ra servers that wrote to the WAL file it deletes the WAL file (and the ETS tables as well.)
The segment writer keeps track of the most recent snapshot index for each Ra server; it then uses that information to avoid performing writes for entries that potentially have been truncated.
Ra distinguishes between open and closed WAL's ETS tables. A Ra server can have only 1 (or none) open ETS table at a time. This open ETS table contains the entries that are in the current WAL file.
During WAL rollover, open ETS tables become closed because they won't be appended to anymore. Closed ETS tables only exist while the segment writer is flushing them to disk. Once their content is on disk (in segments), they are deleted.
Because the segment writer may have a backlog, there can be many closed ETS tables, but typically there are none.
So to read from an (open) ETS table, there are at least 2 ETS lookups:
one on the ra_log_open_mem_tables
table to get the current range of the
open tables and then another one to get the actual entry from the table itself.
Ra servers are in charge of the deletion of closed ETS tables. The reason for this is that a Ra server may be between a range lookup and a table read, so another process like the segment writer cannot delete the table meanwhile.
So after a flush, the segment writer sends information to Ra servers, like the list of the new and updated segments, and which ETS tables were used for the update. The Ra server can then delete these tables safely.
If entries from this closed and deleted tables are needed later, they will be read from segments directly.
Ra servers offload the work of persisting snapshots to the snapshot writer process so that a large snapshot write does not block the Ra server unnecessarily. After a Ra server has written a snapshot it can delete any segments that only contain entries with a lower index than the snapshot.
A simplified view of the lifetime of a single write.
Any process can register to be notified about log events (new segments, lower bound), it can then perform some queries for its own needs, typically reading entries.
One potential use case of external log readers is to offload some busy processes (e.g. AMQP queues) and let other processes (e.g. AMQP channels) re-read entries, to avoid slowing down critical processes too much.
External log readers can also be used to implement multi-DC log replication. Those readers could even only read and replicate committed entries.
Recovery is the process of restarting after the system has stopped (normally or abruptly).
When the WAL starts up, it scans its directory for WAL files. It reads each WAL file from start to finish to rebuild the ETS tables that would have existed and then pass them to the segment writer to flush them. It does this to make sure all entries are in their respective server segments.
If the system crashed while flushing to segments this does mean some entries may be re-written into segments, but this is better than losing them. Those duplicated entries will be de-duplicated later on anyway.
Then Ra servers read their respective segments and re-apply entries to get to the last state of their state machine.
TBD: currently undergoing changes