Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: add Raft::ensure_linearizable() to ensure linearizable read #964

Merged
merged 1 commit into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore/src/network/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub async fn read(app: Data<App>, req: Json<String>) -> actix_web::Result<impl R

#[post("/consistent_read")]
pub async fn consistent_read(app: Data<App>, req: Json<String>) -> actix_web::Result<impl Responder> {
let ret = app.raft.is_leader().await;
let ret = app.raft.ensure_linearizable().await;

match ret {
Ok(_) => {
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/src/network/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn read(mut req: Request<Arc<App>>) -> tide::Result {
}

async fn consistent_read(mut req: Request<Arc<App>>) -> tide::Result {
let ret = req.state().raft.is_leader().await;
let ret = req.state().raft.ensure_linearizable().await;

match ret {
Ok(_) => {
Expand Down
10 changes: 8 additions & 2 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,14 @@ pub type MemNodeId = u64;

openraft::declare_raft_types!(
/// Declare the type configuration for `MemStore`.
pub TypeConfig: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId, Node = (),
Entry = Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>, AsyncRuntime = TokioRuntime
pub TypeConfig:
D = ClientRequest,
R = ClientResponse,
NodeId = MemNodeId,
Node = (),
Entry = Entry<TypeConfig>,
SnapshotData = Cursor<Vec<u8>>,
AsyncRuntime = TokioRuntime
);

/// The application snapshot type which the `MemStore` works with.
Expand Down
26 changes: 17 additions & 9 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
use crate::core::notify::Notify;
use crate::core::raft_msg::external_command::ExternalCommand;
use crate::core::raft_msg::AppendEntriesTx;
use crate::core::raft_msg::ClientReadTx;
use crate::core::raft_msg::ClientWriteTx;
use crate::core::raft_msg::InstallSnapshotTx;
use crate::core::raft_msg::RaftMsg;
Expand All @@ -45,7 +46,6 @@
use crate::engine::Respond;
use crate::entry::FromAppData;
use crate::entry::RaftEntry;
use crate::error::CheckIsLeaderError;
use crate::error::ClientWriteError;
use crate::error::Fatal;
use crate::error::ForwardToLeader;
Expand Down Expand Up @@ -263,12 +263,19 @@
// TODO: the second condition is such a read request can only read from state machine only when the last log it sees
// at `T1` is committed.
#[tracing::instrument(level = "trace", skip(self, tx))]
pub(super) async fn handle_check_is_leader_request(
&mut self,
tx: ResultSender<(), CheckIsLeaderError<C::NodeId, C::Node>>,
) {
pub(super) async fn handle_check_is_leader_request(&mut self, tx: ClientReadTx<C>) {
// Setup sentinel values to track when we've received majority confirmation of leadership.

let resp = {
let read_log_id = self.engine.state.get_read_log_id().copied();

// TODO: this applied is a little stale when being returned to client.
// Fix this when the following heartbeats are replaced with calling RaftNetwork.
let applied = self.engine.state.io_applied().copied();

(read_log_id, applied)
};

let my_id = self.id;
let my_vote = *self.engine.state.vote_ref();
let ttl = Duration::from_millis(self.config.heartbeat_interval);
Expand All @@ -278,7 +285,7 @@
let mut granted = btreeset! {my_id};

if eff_mem.is_quorum(granted.iter()) {
let _ = tx.send(Ok(()));
let _ = tx.send(Ok(resp));
return;
}

Expand Down Expand Up @@ -351,7 +358,7 @@
}
};

// If we receive a response with a greater term, then revert to follower and abort this
// If we receive a response with a greater vote, then revert to follower and abort this
// request.
if let AppendEntriesResponse::HigherVote(vote) = append_res {
debug_assert!(
Expand All @@ -368,7 +375,7 @@
});

if let Err(_e) = send_res {
tracing::error!("fail to send HigherVote to raft core");
tracing::error!("fail to send HigherVote to RaftCore");
}

// we are no longer leader so error out early
Expand All @@ -380,7 +387,7 @@
granted.insert(target);

if eff_mem.is_quorum(granted.iter()) {
let _ = tx.send(Ok(()));
let _ = tx.send(Ok(resp));
return;
}
}
Expand All @@ -395,7 +402,8 @@
.into()));
};

// TODO: do not spawn, manage read requests with a queue by RaftCore
C::AsyncRuntime::spawn(waiting_fu.instrument(tracing::debug_span!("spawn_is_leader_waiting")));

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (memstore)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / cluster-benchmark (cluster_benchmark)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test-bench (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (sledstore)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, serde)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly, loosen-follower-log-revert)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 30)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 30)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / external-stores (stores/rocksstore-v2)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (rocksstore)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (rocksstore-compat07)

unused implementer of `futures::Future` that must be used
}

/// Submit change-membership by writing a Membership log entry.
Expand Down Expand Up @@ -998,7 +1006,7 @@
let id = self.id;
let option = RPCOption::new(ttl);

C::AsyncRuntime::spawn(

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (memstore)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / cluster-benchmark (cluster_benchmark)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test-bench (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (sledstore)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, serde)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly, loosen-follower-log-revert)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 30)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 30)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / external-stores (stores/rocksstore-v2)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (rocksstore)

unused implementer of `futures::Future` that must be used

Check warning on line 1009 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (rocksstore-compat07)

unused implementer of `futures::Future` that must be used
async move {
let tm_res = C::AsyncRuntime::timeout(ttl, client.vote(req, option)).await;
let res = match tm_res {
Expand Down
12 changes: 9 additions & 3 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use crate::raft::InstallSnapshotRequest;
use crate::raft::InstallSnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::NodeIdOf;
use crate::type_config::alias::NodeOf;
use crate::ChangeMembers;
use crate::MessageSummary;
use crate::RaftTypeConfig;
Expand All @@ -35,8 +38,11 @@ pub(crate) type VoteTx<NID> = ResultSender<VoteResponse<NID>, Infallible>;
pub(crate) type AppendEntriesTx<NID> = ResultSender<AppendEntriesResponse<NID>, Infallible>;

/// TX for Client Write Response
pub(crate) type ClientWriteTx<C> =
ResultSender<ClientWriteResponse<C>, ClientWriteError<<C as RaftTypeConfig>::NodeId, <C as RaftTypeConfig>::Node>>;
pub(crate) type ClientWriteTx<C> = ResultSender<ClientWriteResponse<C>, ClientWriteError<NodeIdOf<C>, NodeOf<C>>>;

/// TX for Linearizable Read Response
pub(crate) type ClientReadTx<C> =
ResultSender<(Option<LogIdOf<C>>, Option<LogIdOf<C>>), CheckIsLeaderError<NodeIdOf<C>, NodeOf<C>>>;

/// A message sent by application to the [`RaftCore`].
///
Expand Down Expand Up @@ -65,7 +71,7 @@ where C: RaftTypeConfig
},

CheckIsLeaderRequest {
tx: ResultSender<(), CheckIsLeaderError<C::NodeId, C::Node>>,
tx: ClientReadTx<C>,
},

Initialize {
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/docs/protocol/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
//! The protocol used by Openraft to replicate data.

pub mod read {
#![doc = include_str!("read.md")]
}

pub mod replication {
#![doc = include_str!("replication.md")]

Expand Down
74 changes: 74 additions & 0 deletions openraft/src/docs/protocol/read.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Read Operations

Read operations within a Raft cluster are guaranteed to be linearizable.

This ensures that for any two read operations,
`A` and `B`, if `B` occurs after `A` by wall clock time,
then `B` will observe the same state as `A` or any subsequent state changes made by `A`,
regardless of which node within the cluster each operation is performed on.

In Openraft `read_index` is the same as the `read_index` in the original raft paper.
Openraft also use `read_log_id` instead of `read_index`.

## Ensuring linearizability

To ensure linearizability, read operations must perform a [`get_read_log_id()`] operation on the leader before proceeding.

This method confirms that this node is the leader at the time of invocation by sending heartbeats to a quorum of followers, and returns `(read_log_id, last_applied_log_id)`:
- `read_log_id` represents the log id up to which the state machine should apply to ensure a
linearizable read,
- `last_applied_log_id` is the last applied log id.

The caller then wait for `last_applied_log_id` to catch up `read_log_id`, which can be done by subscribing to [`Raft::metrics`],
and at last, proceed with the state machine read.

The above steps are encapsulated in the [`ensure_linearizable()`] method.

## Examples

```ignore
my_raft.ensure_linearizable().await?;
proceed_with_state_machine_read();
```

The above snippet does the same as the following:

```ignore
let (read_log_id, applied) = self.get_read_log_id().await?;

if read_log_id.index() > applied.index() {
self.wait(None).applied_index_at_least(read_log_id.index(), "").await?
}

proceed_with_state_machine_read();
```

The comparison `read_log_id > applied_log_id` would also be valid in the above example.


## Ensuring Linearizability with `read_log_id`

The `read_log_id` is determined as the maximum of the `last_committed_log_id` and the
log id of the first log entry in the current leader's term (the "blank" log entry).

Assumes another earlier read operation reads from state machine with up to log id `A`.
Since the leader has all committed entries up to its initial blank log entry,
we have: `read_log_id >= A`.

When the `last_applied_log_id` meets or exceeds `read_log_id`,
the state machine contains all state upto `A`. Therefore, a linearizable read is assured
when `last_applied_log_id >= read_log_id`.


## Ensuring Linearizability with `read_index`

And it is also legal by comparing `last_applied_log_id.index() >= read_log_id.index()`
due to the guarantee that committed logs will not be lost.

Since a previous read could only have observed committed logs, and `read_log_id.index()` is
at least as large as any committed log, once `last_applied_log_id.index() >= read_log_id.index()`, the state machine is assured to reflect all entries seen by any past read.


[`ensure_linearizable()`]: crate::Raft::ensure_linearizable
[`get_read_log_id()`]: crate::Raft::get_read_log_id
[`Raft::metrics`]: crate::Raft::metrics
22 changes: 19 additions & 3 deletions openraft/src/engine/log_id_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ impl<NID: NodeId> LogIdList<NID> {
/// Get the log id at the specified index.
///
/// It will return `last_purged_log_id` if index is at the last purged index.
#[allow(dead_code)]
pub(crate) fn get(&self, index: u64) -> Option<LogId<NID>> {
let res = self.key_log_ids.binary_search_by(|log_id| log_id.index.cmp(&index));

Expand All @@ -285,17 +284,34 @@ impl<NID: NodeId> LogIdList<NID> {
}
}

#[allow(dead_code)]
pub(crate) fn first(&self) -> Option<&LogId<NID>> {
self.key_log_ids.first()
}

#[allow(dead_code)]
pub(crate) fn last(&self) -> Option<&LogId<NID>> {
self.key_log_ids.last()
}

pub(crate) fn key_log_ids(&self) -> &[LogId<NID>] {
&self.key_log_ids
}

/// Returns key log ids appended by the last leader.
///
/// Note that the 0-th log does not belong to any leader(but a membership log to initialize a
/// cluster) but this method does not differentiate between them.
pub(crate) fn by_last_leader(&self) -> &[LogId<NID>] {
let ks = &self.key_log_ids;
let l = ks.len();
if l < 2 {
return ks;
}

// There are at most two(adjacent) key log ids with the same leader_id
if ks[l - 1].leader_id() == ks[l - 2].leader_id() {
&ks[l - 2..]
} else {
&ks[l - 1..]
}
}
}
27 changes: 26 additions & 1 deletion openraft/src/engine/tests/log_id_list_test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::engine::LogIdList;
use crate::testing::log_id;

#[test]
fn test_log_id_list_extend_from_same_leader() -> anyhow::Result<()> {
Expand Down Expand Up @@ -350,4 +351,28 @@ fn test_log_id_list_get_log_id() -> anyhow::Result<()> {

Ok(())
}
use crate::testing::log_id;

#[test]
fn test_log_id_list_by_last_leader() -> anyhow::Result<()> {
// len == 0
let ids = LogIdList::<u64>::default();
assert_eq!(ids.by_last_leader(), &[]);

// len == 1
let ids = LogIdList::<u64>::new([log_id(1, 1, 1)]);
assert_eq!(&[log_id(1, 1, 1)], ids.by_last_leader());

// len == 2, the last leader has only one log
let ids = LogIdList::<u64>::new([log_id(1, 1, 1), log_id(3, 1, 3)]);
assert_eq!(&[log_id(3, 1, 3)], ids.by_last_leader());

// len == 2, the last leader has two logs
let ids = LogIdList::<u64>::new([log_id(1, 1, 1), log_id(1, 1, 3)]);
assert_eq!(&[log_id(1, 1, 1), log_id(1, 1, 3)], ids.by_last_leader());

// len > 2, the last leader has only more than one logs
let ids = LogIdList::<u64>::new([log_id(1, 1, 1), log_id(7, 1, 8), log_id(7, 1, 10)]);
assert_eq!(&[log_id(7, 1, 8), log_id(7, 1, 10)], ids.by_last_leader());

Ok(())
}
Loading
Loading