Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: For single-threaded execution, there is no need for Send/Sync bounds #934

Merged
merged 1 commit into from
Nov 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions openraft/src/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use crate::TokioInstant;
/// ## Note
///
/// The default asynchronous runtime is `tokio`.
pub trait AsyncRuntime: Debug + Default + Send + Sync + 'static {
pub trait AsyncRuntime: Debug + Default + OptionalSend + OptionalSync + 'static {
/// The error type of [`Self::JoinHandle`].
type JoinError: Debug + Display + Send;
type JoinError: Debug + Display + OptionalSend;

/// The return type of [`Self::spawn`].
type JoinHandle<T: OptionalSend + 'static>: Future<Output = Result<T, Self::JoinError>>
Expand All @@ -33,7 +33,7 @@ pub trait AsyncRuntime: Debug + Default + Send + Sync + 'static {
type Instant: Instant;

/// The timeout error type.
type TimeoutError: Debug + Display + Send;
type TimeoutError: Debug + Display + OptionalSend;

/// The timeout type used by [`Self::timeout`] and [`Self::timeout_at`] that enables the user
/// to await the outcome of a [`Future`].
Expand Down
8 changes: 7 additions & 1 deletion openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,19 @@ where
tx: ResultSender<ClientWriteResponse<C>, ClientWriteError<C::NodeId, C::Node>>,
},

#[allow(clippy::type_complexity)]
ExternalRequest {
#[allow(clippy::type_complexity)]
#[cfg(not(feature = "singlethreaded"))]
req: Box<
dyn FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>, &mut LS, &mut N)
+ Send
+ 'static,
>,
#[cfg(feature = "singlethreaded")]
req: Box<
dyn FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>, &mut LS, &mut N)
+ 'static,
>,
},

ExternalCommand {
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/defensive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ use std::ops::RangeBounds;
use crate::log_id::RaftLogId;
use crate::DefensiveError;
use crate::ErrorSubject;
use crate::OptionalSend;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::Violation;

pub fn check_range_matches_entries<C: RaftTypeConfig, RB: RangeBounds<u64> + Debug + Send>(
pub fn check_range_matches_entries<C: RaftTypeConfig, RB: RangeBounds<u64> + Debug + OptionalSend>(
range: RB,
entries: &[C::Entry],
) -> Result<(), StorageError<C::NodeId>> {
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/docs/feature_flags/feature-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ By default openraft enables no features.
V2 storage separates log store and state machine store so that log IO and state machine IO can be parallelized naturally.
<br/><br/>

- `singlethreaded`: removes `Send` bounds from `AppData`, `AppDataResponse`, `RaftEntry`, and `SnapshotData` to force the
asynchronous runtime to spawn any tasks in the current thread.
- `singlethreaded`: removes `Send` and `Sync` bounds from `AppData`, `AppDataResponse`, `RaftEntry`, `SnapshotData`
and other types to force the asynchronous runtime to spawn any tasks in the current thread.
This is for any single-threaded application that never allows a raft instance to be shared among multiple threads.
In order to use the feature, `AsyncRuntime::spawn` should invoke `tokio::task::spawn_local` or equivalents.
<br/><br/>
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/entry/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::Node;
use crate::NodeId;
use crate::OptionalSend;
use crate::OptionalSerde;
use crate::OptionalSync;

/// Defines operations on an entry payload.
pub trait RaftPayload<NID, N>
Expand All @@ -27,7 +28,7 @@ pub trait RaftEntry<NID, N>: RaftPayload<NID, N> + RaftLogId<NID>
where
N: Node,
NID: NodeId,
Self: OptionalSerde + Debug + Display + OptionalSend + Sync,
Self: OptionalSerde + Debug + Display + OptionalSend + OptionalSync,
{
/// Create a new blank log entry.
///
Expand Down
7 changes: 5 additions & 2 deletions openraft/src/instant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use std::panic::RefUnwindSafe;
use std::panic::UnwindSafe;
use std::time::Duration;

use crate::OptionalSend;
use crate::OptionalSync;

/// A measurement of a monotonically non-decreasing clock.
pub trait Instant:
Add<Duration, Output = Self>
Expand All @@ -19,11 +22,11 @@ pub trait Instant:
+ PartialEq
+ PartialOrd
+ RefUnwindSafe
+ Send
+ OptionalSend
+ Sub<Duration, Output = Self>
+ Sub<Self, Output = Duration>
+ SubAssign<Duration>
+ Sync
+ OptionalSync
+ Unpin
+ UnwindSafe
+ 'static
Expand Down
8 changes: 4 additions & 4 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ impl<T: Sync + ?Sized> OptionalSync for T {}
/// ## Note
///
/// The trait is automatically implemented for all types which satisfy its supertraits.
pub trait AppData: OptionalSend + Sync + 'static + OptionalSerde {}
pub trait AppData: OptionalSend + OptionalSync + 'static + OptionalSerde {}

impl<T> AppData for T where T: OptionalSend + Sync + 'static + OptionalSerde {}
impl<T> AppData for T where T: OptionalSend + OptionalSync + 'static + OptionalSerde {}

/// A trait defining application specific response data.
///
Expand All @@ -210,6 +210,6 @@ impl<T> AppData for T where T: OptionalSend + Sync + 'static + OptionalSerde {}
/// ## Note
///
/// The trait is automatically implemented for all types which satisfy its supertraits.
pub trait AppDataResponse: OptionalSend + Sync + 'static + OptionalSerde {}
pub trait AppDataResponse: OptionalSend + OptionalSync + 'static + OptionalSerde {}

impl<T> AppDataResponse for T where T: OptionalSend + Sync + 'static + OptionalSerde {}
impl<T> AppDataResponse for T where T: OptionalSend + OptionalSync + 'static + OptionalSerde {}
7 changes: 6 additions & 1 deletion openraft/src/network/backoff.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
use std::time::Duration;

use crate::OptionalSend;

/// A backoff instance that is an infinite iterator of durations to sleep before next retry, when a
/// [`Unreachable`](`crate::error::Unreachable`) occurs.
pub struct Backoff {
#[cfg(not(feature = "singlethreaded"))]
inner: Box<dyn Iterator<Item = Duration> + Send + 'static>,
#[cfg(feature = "singlethreaded")]
inner: Box<dyn Iterator<Item = Duration> + 'static>,
}

impl Backoff {
pub fn new(iter: impl Iterator<Item = Duration> + Send + 'static) -> Self {
pub fn new(iter: impl Iterator<Item = Duration> + OptionalSend + 'static) -> Self {
Self { inner: Box::new(iter) }
}
}
Expand Down
4 changes: 3 additions & 1 deletion openraft/src/network/factory.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use macros::add_async_trait;

use crate::network::RaftNetwork;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftTypeConfig;

/// A trait defining the interface for a Raft network factory to create connections between cluster
Expand All @@ -12,7 +14,7 @@ use crate::RaftTypeConfig;
/// Typically, the network implementation as such will be hidden behind a `Box<T>` or `Arc<T>` and
/// this interface implemented on the `Box<T>` or `Arc<T>`.
#[add_async_trait]
pub trait RaftNetworkFactory<C>: Send + Sync + 'static
pub trait RaftNetworkFactory<C>: OptionalSend + OptionalSync + 'static
where C: RaftTypeConfig
{
/// Actual type of the network handling a single connection.
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/network/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::raft::InstallSnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftTypeConfig;

/// A trait defining the interface for a Raft network between cluster members.
Expand All @@ -36,7 +37,7 @@ use crate::RaftTypeConfig;
///
/// - Implementing the new APIs will disable the old APIs.
#[add_async_trait]
pub trait RaftNetwork<C>: OptionalSend + Sync + 'static
pub trait RaftNetwork<C>: OptionalSend + OptionalSync + 'static
where C: RaftTypeConfig
{
/// Send an AppendEntries RPC to the target.
Expand Down
30 changes: 25 additions & 5 deletions openraft/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,32 @@ use std::fmt::Display;
use std::fmt::Formatter;
use std::hash::Hash;

use crate::OptionalSend;
use crate::OptionalSync;

/// Essential trait bound for node-id, except serde.
#[doc(hidden)]
pub trait NodeIdEssential:
Sized + Send + Sync + Eq + PartialEq + Ord + PartialOrd + Debug + Display + Hash + Copy + Clone + Default + 'static
Sized
+ OptionalSend
+ OptionalSync
+ Eq
+ PartialEq
+ Ord
+ PartialOrd
+ Debug
+ Display
+ Hash
+ Copy
+ Clone
+ Default
+ 'static
{
}

impl<T> NodeIdEssential for T where T: Sized
+ Send
+ Sync
+ OptionalSend
+ OptionalSync
+ Eq
+ PartialEq
+ Ord
Expand Down Expand Up @@ -43,8 +59,12 @@ pub trait NodeId: NodeIdEssential {}
impl<T> NodeId for T where T: NodeIdEssential {}

/// Essential trait bound for application level node-data, except serde.
pub trait NodeEssential: Sized + Send + Sync + Eq + PartialEq + Debug + Clone + Default + 'static {}
impl<T> NodeEssential for T where T: Sized + Send + Sync + Eq + PartialEq + Debug + Clone + Default + 'static {}
pub trait NodeEssential:
Sized + OptionalSend + OptionalSync + Eq + PartialEq + Debug + Clone + Default + 'static
{
}
impl<T> NodeEssential for T where T: Sized + OptionalSend + OptionalSync + Eq + PartialEq + Debug + Clone + Default + 'static
{}

/// A Raft `Node`, this trait holds all relevant node information.
///
Expand Down
44 changes: 43 additions & 1 deletion openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use crate::ChangeMembers;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::MessageSummary;
use crate::OptionalSend;
use crate::RaftState;
pub use crate::RaftTypeConfig;
use crate::StorageHelper;
Expand Down Expand Up @@ -149,6 +150,47 @@ where
}
}

#[cfg(feature = "singlethreaded")]
// SAFETY: Even for a single-threaded Raft, the API object is MT-capable.
//
// The API object just sends the requests to the Raft loop over a channel. If all the relevant
// types in the type config are `Send`, then it's safe to send the request across threads over
// the channel.
//
// Notably, the state machine, log storage and network factory DO NOT have to be `Send`, those
// are only used within Raft task(s) on a single thread.
unsafe impl<C, N, LS, SM> Send for Raft<C, N, LS, SM>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
C::D: Send,
C::Entry: Send,
C::Node: Send + Sync,
C::NodeId: Send + Sync,
C::R: Send,
{
}

#[cfg(feature = "singlethreaded")]
// SAFETY: Even for a single-threaded Raft, the API object is MT-capable.
//
// See above for details.
unsafe impl<C, N, LS, SM> Sync for Raft<C, N, LS, SM>
where
C: RaftTypeConfig + Send,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
C::D: Send,
C::Entry: Send,
C::Node: Send + Sync,
C::NodeId: Send + Sync,
C::R: Send,
{
}

impl<C, N, LS, SM> Raft<C, N, LS, SM>
where
C: RaftTypeConfig,
Expand Down Expand Up @@ -698,7 +740,7 @@ where
/// destroyed right away and not called at all.
pub fn external_request<
F: FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>, &mut LS, &mut N)
+ Send
+ OptionalSend
+ 'static,
>(
&self,
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/storage/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::storage::RaftStateMachine;
use crate::LogId;
use crate::LogState;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftLogReader;
use crate::RaftStorage;
use crate::RaftTypeConfig;
Expand Down Expand Up @@ -103,7 +104,7 @@ where
C: RaftTypeConfig,
S: RaftStorage<C>,
{
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend + OptionalSync>(
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<C::NodeId>> {
Expand Down
4 changes: 3 additions & 1 deletion openraft/src/storage/log_store_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use macros::add_async_trait;

use crate::defensive::check_range_matches_entries;
use crate::LogId;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftLogId;
use crate::RaftLogReader;
use crate::RaftTypeConfig;
Expand All @@ -26,7 +28,7 @@ where C: RaftTypeConfig
///
/// Similar to `try_get_log_entries` except an error will be returned if there is an entry not
/// found in the specified range.
async fn get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
async fn get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend + OptionalSync>(
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<C::NodeId>> {
Expand Down
9 changes: 5 additions & 4 deletions openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::LogId;
use crate::MessageSummary;
use crate::NodeId;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::StoredMembership;
Expand Down Expand Up @@ -138,7 +139,7 @@ pub struct LogState<C: RaftTypeConfig> {
/// this interface implemented on the `Arc<T>`. It can be co-implemented with [`RaftStorage`]
/// interface on the same cloneable object, if the underlying state machine is anyway synchronized.
#[add_async_trait]
pub trait RaftLogReader<C>: Send + Sync + 'static
pub trait RaftLogReader<C>: OptionalSend + OptionalSync + 'static
where C: RaftTypeConfig
{
/// Get a series of log entries from storage.
Expand All @@ -147,7 +148,7 @@ where C: RaftTypeConfig
/// stop)`.
///
/// Entry that is not found is allowed.
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend + OptionalSync>(
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<C::NodeId>>;
Expand All @@ -162,7 +163,7 @@ where C: RaftTypeConfig
/// co-implemented with [`RaftStorage`] interface on the same cloneable object, if the underlying
/// state machine is anyway synchronized.
#[add_async_trait]
pub trait RaftSnapshotBuilder<C>: Send + Sync + 'static
pub trait RaftSnapshotBuilder<C>: OptionalSend + OptionalSync + 'static
where C: RaftTypeConfig
{
/// Build snapshot
Expand Down Expand Up @@ -192,7 +193,7 @@ where C: RaftTypeConfig
/// The implementation of the API has to cope with (infrequent) concurrent access from these two
/// components.
#[add_async_trait]
pub trait RaftStorage<C>: RaftLogReader<C> + Send + Sync + 'static
pub trait RaftStorage<C>: RaftLogReader<C> + OptionalSend + OptionalSync + 'static
where C: RaftTypeConfig
{
/// Log reader type.
Expand Down
Loading
Loading