Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change: remove AsyncRuntime::abort() #1012

Merged
merged 2 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions openraft/src/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ pub trait AsyncRuntime: Debug + Default + OptionalSend + OptionalSync + 'static
/// Check if the [`Self::JoinError`] is `panic`.
fn is_panic(join_error: &Self::JoinError) -> bool;

/// Abort the task associated with the supplied join handle.
fn abort<T: OptionalSend + 'static>(join_handle: &Self::JoinHandle<T>);

/// Get the random number generator to use for generating random numbers.
///
/// # Note
Expand Down Expand Up @@ -131,11 +128,6 @@ impl AsyncRuntime for TokioRuntime {
join_error.is_panic()
}

#[inline]
fn abort<T: OptionalSend + 'static>(join_handle: &Self::JoinHandle<T>) {
join_handle.abort();
}

#[inline]
fn thread_rng() -> Self::ThreadLocalRng {
rand::thread_rng()
Expand Down
134 changes: 124 additions & 10 deletions openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;

use futures::future::Either;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::Instrument;
use tracing::Level;
use tracing::Span;

use crate::core::notify::Notify;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::JoinHandleOf;
use crate::AsyncRuntime;
use crate::Instant;
use crate::RaftTypeConfig;
Expand All @@ -31,7 +37,17 @@
where C: RaftTypeConfig
{
enabled: Arc<AtomicBool>,
join_handle: <C::AsyncRuntime as AsyncRuntime>::JoinHandle<()>,
shutdown: Mutex<Option<oneshot::Sender<()>>>,
join_handle: Mutex<Option<JoinHandleOf<C, ()>>>,
}

impl<C> Drop for TickHandle<C>
where C: RaftTypeConfig
{
/// Signal the tick loop to stop, without waiting for it to stop.
fn drop(&mut self) {
let _ = self.shutdown();
}
}

impl<C> Tick<C>
Expand All @@ -44,27 +60,51 @@
enabled: enabled.clone(),
tx,
};
let join_handle = C::AsyncRuntime::spawn(this.tick_loop().instrument(tracing::span!(

let (shutdown, shutdown_rx) = oneshot::channel();

let shutdown = Mutex::new(Some(shutdown));

let join_handle = AsyncRuntimeOf::<C>::spawn(this.tick_loop(shutdown_rx).instrument(tracing::span!(
parent: &Span::current(),
Level::DEBUG,
"tick"
)));
TickHandle { enabled, join_handle }

TickHandle {
enabled,
shutdown,
join_handle: Mutex::new(Some(join_handle)),
}
}

pub(crate) async fn tick_loop(self) {
pub(crate) async fn tick_loop(self, mut cancel_rx: oneshot::Receiver<()>) {
let mut i = 0;

let mut cancel = std::pin::pin!(cancel_rx);

loop {
i += 1;
let at = InstantOf::<C>::now() + self.interval;
let mut sleep_fut = AsyncRuntimeOf::<C>::sleep_until(at);
let sleep_fut = std::pin::pin!(sleep_fut);
let cancel_fut = cancel.as_mut();

let at = <C::AsyncRuntime as AsyncRuntime>::Instant::now() + self.interval;
C::AsyncRuntime::sleep_until(at).await;
match futures::future::select(cancel_fut, sleep_fut).await {
Either::Left((_canceled, _)) => {
tracing::info!("TickLoop received cancel signal, quit");
return;
}
Either::Right((_, _)) => {
// sleep done
}
}

if !self.enabled.load(Ordering::Relaxed) {
i -= 1;
continue;
}

i += 1;

let send_res = self.tx.send(Notify::Tick { i });
if let Err(_e) = send_res {
tracing::info!("Stopping tick_loop(), main loop terminated");
Expand All @@ -83,7 +123,81 @@
self.enabled.store(enabled, Ordering::Relaxed);
}

pub(crate) async fn shutdown(&self) {
C::AsyncRuntime::abort(&self.join_handle);
/// Signal the tick loop to stop. And return a JoinHandle to wait for the loop to stop.
///
/// If it is called twice, the second call will return None.
pub(crate) fn shutdown(&self) -> Option<JoinHandleOf<C, ()>> {
{
let shutdown = {
let mut x = self.shutdown.lock().unwrap();
x.take()
};

if let Some(shutdown) = shutdown {
let send_res = shutdown.send(());
tracing::info!("Timer shutdown signal sent: {send_res:?}");
} else {
tracing::warn!("Double call to Raft::shutdown()");
}
}

let jh = {
let mut x = self.join_handle.lock().unwrap();
x.take()
};
jh
}
}

#[cfg(test)]
mod tests {
use std::io::Cursor;

use tokio::time::Duration;

Check warning on line 156 in openraft/src/core/tick.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused import: `tokio::time::Duration`

use crate::core::Tick;

Check warning on line 158 in openraft/src/core/tick.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused import: `crate::core::Tick`
use crate::type_config::alias::AsyncRuntimeOf;

Check warning on line 159 in openraft/src/core/tick.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused import: `crate::type_config::alias::AsyncRuntimeOf`
use crate::AsyncRuntime;

Check warning on line 160 in openraft/src/core/tick.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused import: `crate::AsyncRuntime`
use crate::RaftTypeConfig;
use crate::TokioRuntime;

#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub(crate) struct TickUTConfig {}
impl RaftTypeConfig for TickUTConfig {
type D = ();
type R = ();
type NodeId = u64;
type Node = ();
type Entry = crate::Entry<TickUTConfig>;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = TokioRuntime;
}

// AsyncRuntime::spawn is `spawn_local` with singlethreaded enabled.
// It will result in a panic:
// `spawn_local` called from outside of a `task::LocalSet`.
#[cfg(not(feature = "singlethreaded"))]
#[tokio::test]
async fn test_shutdown() -> anyhow::Result<()> {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let th = Tick::<TickUTConfig>::spawn(Duration::from_millis(100), tx, true);

AsyncRuntimeOf::<TickUTConfig>::sleep(Duration::from_millis(500)).await;
let _ = th.shutdown().unwrap().await;
AsyncRuntimeOf::<TickUTConfig>::sleep(Duration::from_millis(500)).await;

let mut received = vec![];
while let Some(x) = rx.recv().await {
received.push(x);
}

assert!(
received.len() < 10,
"no more tick will be received after shutdown: {}",
received.len()
);

Ok(())
}
}
4 changes: 3 additions & 1 deletion openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,9 @@ where C: RaftTypeConfig
tracing::info!("sending shutdown signal to RaftCore, sending res: {:?}", send_res);
}
self.inner.join_core_task().await;
self.inner.tick_handle.shutdown().await;
if let Some(join_handle) = self.inner.tick_handle.shutdown() {
let _ = join_handle.await;
}

// TODO(xp): API change: replace `JoinError` with `Fatal`,
// to let the caller know the return value of RaftCore task.
Expand Down
Loading