Skip to content

Commit

Permalink
Merge pull request #973 from drmingdrmer/33-no-client-channel-drop
Browse files Browse the repository at this point in the history
Refactor: respond to client even when leader is switched
  • Loading branch information
drmingdrmer authored Dec 19, 2023
2 parents aebd390 + ea01728 commit 3aa6b98
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 36 deletions.
2 changes: 1 addition & 1 deletion memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl IntoMemClientRequest<ClientRequest> for ClientRequest {

/// The application data response type which the `MemStore` works with.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ClientResponse(Option<String>);
pub struct ClientResponse(pub Option<String>);

pub type MemNodeId = u64;

Expand Down
60 changes: 34 additions & 26 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ use crate::storage::LogFlushed;
use crate::storage::RaftLogReaderExt;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::utime::UTime;
use crate::AsyncRuntime;
Expand Down Expand Up @@ -134,9 +135,6 @@ impl<C: RaftTypeConfig> Debug for ApplyResult<C> {
///
/// It is created when RaftCore enters leader state, and will be dropped when it quits leader state.
pub(crate) struct LeaderData<C: RaftTypeConfig> {
/// Channels to send result back to client when logs are committed.
pub(crate) client_resp_channels: BTreeMap<u64, ClientWriteTx<C>>,

/// A mapping of node IDs the replication state of the target node.
// TODO(xp): make it a field of RaftCore. it does not have to belong to leader.
// It requires the Engine to emit correct add/remove replication commands
Expand All @@ -149,7 +147,6 @@ pub(crate) struct LeaderData<C: RaftTypeConfig> {
impl<C: RaftTypeConfig> LeaderData<C> {
pub(crate) fn new() -> Self {
Self {
client_resp_channels: Default::default(),
replications: BTreeMap::new(),
next_heartbeat: <C::AsyncRuntime as AsyncRuntime>::Instant::now(),
}
Expand Down Expand Up @@ -184,6 +181,9 @@ where

pub(crate) engine: Engine<C>,

/// Channels to send result back to client when logs are applied.
pub(crate) client_resp_channels: BTreeMap<u64, ClientWriteTx<C>>,

pub(crate) leader_data: Option<LeaderData<C>>,

#[allow(dead_code)]
Expand Down Expand Up @@ -468,9 +468,7 @@ where

// Install callback channels.
if let Some(tx) = tx {
if let Some(l) = &mut self.leader_data {
l.client_resp_channels.insert(index, tx);
}
self.client_resp_channels.insert(index, tx);
}

true
Expand Down Expand Up @@ -711,17 +709,15 @@ where
pub(crate) fn handle_apply_result(&mut self, res: ApplyResult<C>) {
tracing::debug!(last_applied = display(res.last_applied), "{}", func_name!());

if let Some(l) = &mut self.leader_data {
let mut results = res.apply_results.into_iter();
let mut applying_entries = res.applying_entries.into_iter();
let mut results = res.apply_results.into_iter();
let mut applying_entries = res.applying_entries.into_iter();

for log_index in res.since..res.end {
let ent = applying_entries.next().unwrap();
let apply_res = results.next().unwrap();
let tx = l.client_resp_channels.remove(&log_index);
for log_index in res.since..res.end {
let ent = applying_entries.next().unwrap();
let apply_res = results.next().unwrap();
let tx = self.client_resp_channels.remove(&log_index);

Self::send_response(ent, apply_res, tx);
}
Self::send_response(ent, apply_res, tx);
}
}

Expand Down Expand Up @@ -1538,16 +1534,6 @@ where
self.leader_data = Some(LeaderData::new());
}
Command::QuitLeader => {
if let Some(l) = &mut self.leader_data {
// Leadership lost, inform waiting clients
let chans = std::mem::take(&mut l.client_resp_channels);
for (_, tx) in chans.into_iter() {
let _ = tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
leader_id: None,
leader_node: None,
})));
}
}
self.leader_data = None;
}
Command::AppendEntry { entry } => {
Expand Down Expand Up @@ -1584,6 +1570,28 @@ where
}
Command::DeleteConflictLog { since } => {
self.log_store.truncate(since).await?;

// Inform clients waiting for logs to be applied.
let removed = self.client_resp_channels.split_off(&since.index);
if !removed.is_empty() {
let leader_id = self.current_leader();
let leader_node = self.get_leader_node(leader_id);

AsyncRuntimeOf::<C>::spawn(async move {

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / cluster-benchmark (cluster_benchmark)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (memstore)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test-bench (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, serde)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (sledstore)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 30)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 30)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / coverage

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly, loosen-follower-log-revert)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / external-stores (stores/rocksstore-v2)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (rocksstore)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (rocksstore-compat07)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (memstore)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test-bench (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / cluster-benchmark (cluster_benchmark)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (sledstore)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, serde)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 30)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 30)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly, loosen-follower-log-revert)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / external-stores (stores/rocksstore-v2)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (rocksstore-compat07)

unused implementer of `futures::Future` that must be used

Check warning on line 1580 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (rocksstore)

unused implementer of `futures::Future` that must be used
for (log_index, tx) in removed.into_iter() {
let res = tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
leader_id,
leader_node: leader_node.clone(),
})));

tracing::debug!(
"sent ForwardToLeader for log_index: {}, is_ok: {}",
log_index,
res.is_ok()
);
}
});
}
}
Command::SendVote { vote_req } => {
self.spawn_parallel_vote_requests(&vote_req).await;
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ mod raft_inner;
mod runtime_config_handle;
mod trigger;

use std::collections::BTreeMap;

pub(crate) use self::external_request::BoxCoreFn;

pub(in crate::raft) mod core_state;
Expand Down Expand Up @@ -210,6 +212,9 @@ where C: RaftTypeConfig
sm_handle,

engine,

client_resp_channels: BTreeMap::new(),

leader_data: None,

tx_api: tx_api.clone(),
Expand Down
25 changes: 16 additions & 9 deletions tests/tests/append_entries/t10_see_higher_vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,27 @@ async fn append_sees_higher_vote() -> Result<()> {
assert!(resp.vote_granted);
}

// Current state:
// n0: vote=(1,0)
// n1: vote=(10,1)
tracing::info!("--- a write operation will see a higher vote, then the leader revert to follower");
{
router.wait(&0, timeout()).state(ServerState::Leader, "node-0 is leader").await?;

let n0 = router.get_raft_handle(&0)?;
let res = n0
.client_write(ClientRequest {
client: "0".to_string(),
serial: 1,
status: "2".to_string(),
})
.await;

tracing::debug!("--- client_write res: {:?}", res);
tokio::spawn(async move {
let res = n0
.client_write(ClientRequest {
client: "0".to_string(),
serial: 1,
status: "2".to_string(),
})
.await;

tracing::debug!("--- client_write res: {:?}", res);
});

tokio::time::sleep(Duration::from_millis(500)).await;

router
.wait(&0, timeout())
Expand Down
1 change: 1 addition & 0 deletions tests/tests/client_api/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ mod t12_trigger_purge_log;
mod t13_trigger_snapshot;
mod t16_with_raft_state;
mod t50_lagging_network_write;
mod t51_write_when_leader_quit;
163 changes: 163 additions & 0 deletions tests/tests/client_api/t51_write_when_leader_quit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use maplit::btreeset;
use openraft::error::ClientWriteError;
use openraft::error::ForwardToLeader;
use openraft::error::RaftError;
use openraft::raft::AppendEntriesRequest;
use openraft::testing::log_id;
use openraft::Config;
use openraft::Vote;
use openraft_memstore::ClientRequest;
use openraft_memstore::IntoMemClientRequest;
use tokio::sync::oneshot;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Client write will receive a [`ForwardToLeader`] error because of log reversion, when leader
/// quit, even after log is appended.
///
/// [`ForwardToLeader`]: openraft::error::ForwardToLeader
#[async_entry::test(worker_threads = 4, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn write_when_leader_quit_and_log_revert() -> Result<()> {
let config = Arc::new(
Config {
heartbeat_interval: 100,
election_timeout_min: 200,
election_timeout_max: 300,
enable_tick: false,
enable_heartbeat: false,
..Default::default()
}
.validate()?,
);

let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let log_index = router.new_cluster(btreeset! {0,1}, btreeset! {}).await?;

tracing::info!(log_index, "--- block replication so that no log will be committed");
router.set_unreachable(1, true);

let (tx, rx) = oneshot::channel();

tracing::info!(log_index, "--- write a log in another task");
{
let n0 = router.get_raft_handle(&0)?;
tokio::spawn(async move {
let res = n0.client_write(ClientRequest::make_request("cli", 1)).await;
tx.send(res).unwrap();
});
}

// wait for log to be appended on leader, and response channel is installed.
tokio::time::sleep(Duration::from_millis(500)).await;

tracing::info!(log_index, "--- force node 0 to give up leadership");
{
let n0 = router.get_raft_handle(&0)?;
let append_res = n0
.append_entries(AppendEntriesRequest {
// From node 2, with a higher term 10
vote: Vote::new_committed(10, 1),
// log_index+1 is the log index the client tries to write, in previous step.
// This log conflict with the log the client written, will cause raft to revert log.
prev_log_id: Some(log_id(10, 1, log_index + 1)),

entries: vec![],
leader_commit: None,
})
.await?;

tracing::info!(log_index, "--- append_res: {:?}", append_res);
}

let write_res = rx.await?;
tracing::info!(log_index, "--- write_res: {:?}", write_res);

let raft_err = write_res.unwrap_err();
assert_eq!(
raft_err,
RaftError::APIError(ClientWriteError::ForwardToLeader(ForwardToLeader {
leader_id: Some(1),
leader_node: Some(()),
}))
);

Ok(())
}

/// Client write will still receive an OK response, as soon as log is committed, even when leader is
/// switched.
///
/// [`ForwardToLeader`]: openraft::error::ForwardToLeader
#[async_entry::test(worker_threads = 4, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn write_when_leader_switched() -> Result<()> {
let config = Arc::new(
Config {
heartbeat_interval: 100,
election_timeout_min: 200,
election_timeout_max: 300,
enable_tick: false,
enable_heartbeat: false,
..Default::default()
}
.validate()?,
);

let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let log_index = router.new_cluster(btreeset! {0,1}, btreeset! {}).await?;

tracing::info!(log_index, "--- block replication so that no log will be committed");
router.set_unreachable(1, true);

let (tx, rx) = oneshot::channel();

tracing::info!(log_index, "--- write a log in another task");
{
let n0 = router.get_raft_handle(&0)?;
tokio::spawn(async move {
let res = n0.client_write(ClientRequest::make_request("cli", 1)).await;
tx.send(res).unwrap();
});
}

// wait for log to be appended on leader, and response channel is installed.
tokio::time::sleep(Duration::from_millis(500)).await;

tracing::info!(log_index, "--- force node 0 to give up leadership, inform it to commit");
{
let n0 = router.get_raft_handle(&0)?;
let append_res = n0
.append_entries(AppendEntriesRequest {
// From node 2, with a higher term 10
vote: Vote::new_committed(10, 1),
// log_index+1 is the log index the client tries to write, in previous step.
// This matches the log on node-0.
prev_log_id: Some(log_id(1, 0, log_index + 1)),

entries: vec![],

// Inform node-0 to commit the pending log.
leader_commit: Some(log_id(1, 0, log_index + 1)),
})
.await?;

dbg!(&append_res);
tracing::info!(log_index, "--- append_res: {:?}", append_res);
}

let write_res = rx.await?;
tracing::info!(log_index, "--- write_res: {:?}", write_res);

let ok_resp = write_res?;
assert_eq!(ok_resp.log_id, log_id(1, 0, log_index + 1), "client write committed");

Ok(())
}

0 comments on commit 3aa6b98

Please sign in to comment.