Skip to content

Commit

Permalink
Refactor: use type alias to simplify types
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Mar 19, 2024
1 parent 08f258a commit 950de57
Show file tree
Hide file tree
Showing 28 changed files with 91 additions and 129 deletions.
6 changes: 2 additions & 4 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,17 +278,15 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn begin_receiving_snapshot(
&mut self,
) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<NodeId>> {
async fn begin_receiving_snapshot(&mut self) -> Result<Box<SnapshotDataOf<TypeConfig>>, StorageError<NodeId>> {
Ok(Box::new(Cursor::new(Vec::new())))
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, ()>,
snapshot: Box<<TypeConfig as RaftTypeConfig>::SnapshotData>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<NodeId>> {
let new_snapshot = StoredSnapshot {
meta: meta.clone(),
Expand Down
6 changes: 2 additions & 4 deletions examples/raft-kv-memstore-generic-snapshot-data/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,15 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn begin_receiving_snapshot(
&mut self,
) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<NodeId>> {
async fn begin_receiving_snapshot(&mut self) -> Result<Box<SnapshotDataOf<TypeConfig>>, StorageError<NodeId>> {
Ok(Box::default())
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, BasicNode>,
snapshot: Box<<TypeConfig as RaftTypeConfig>::SnapshotData>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<NodeId>> {
tracing::info!("install snapshot");

Expand Down
6 changes: 2 additions & 4 deletions examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,15 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn begin_receiving_snapshot(
&mut self,
) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<NodeId>> {
async fn begin_receiving_snapshot(&mut self) -> Result<Box<SnapshotDataOf<TypeConfig>>, StorageError<NodeId>> {
Ok(Box::default())
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, BasicNode>,
snapshot: Box<<TypeConfig as RaftTypeConfig>::SnapshotData>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<NodeId>> {
tracing::info!("install snapshot");

Expand Down
6 changes: 2 additions & 4 deletions examples/raft-kv-memstore-singlethreaded/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,15 @@ impl RaftStateMachine<TypeConfig> for Rc<StateMachineStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn begin_receiving_snapshot(
&mut self,
) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<NodeId>> {
async fn begin_receiving_snapshot(&mut self) -> Result<Box<SnapshotDataOf<TypeConfig>>, StorageError<NodeId>> {
Ok(Box::new(Cursor::new(Vec::new())))
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, BasicNode>,
snapshot: Box<<TypeConfig as RaftTypeConfig>::SnapshotData>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<NodeId>> {
tracing::info!(
{ snapshot_size = snapshot.get_ref().len() },
Expand Down
6 changes: 2 additions & 4 deletions examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,15 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn begin_receiving_snapshot(
&mut self,
) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<NodeId>> {
async fn begin_receiving_snapshot(&mut self) -> Result<Box<SnapshotDataOf<TypeConfig>>, StorageError<NodeId>> {
Ok(Box::new(Cursor::new(Vec::new())))
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, BasicNode>,
snapshot: Box<<TypeConfig as RaftTypeConfig>::SnapshotData>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<NodeId>> {
tracing::info!(
{ snapshot_size = snapshot.get_ref().len() },
Expand Down
39 changes: 14 additions & 25 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::utime::UTime;
use crate::AsyncRuntime;
use crate::ChangeMembers;
Expand Down Expand Up @@ -141,14 +142,14 @@ pub(crate) struct LeaderData<C: RaftTypeConfig> {
pub(super) replications: BTreeMap<C::NodeId, ReplicationHandle<C>>,

/// The time to send next heartbeat.
pub(crate) next_heartbeat: <C::AsyncRuntime as AsyncRuntime>::Instant,
pub(crate) next_heartbeat: InstantOf<C>,
}

impl<C: RaftTypeConfig> LeaderData<C> {
pub(crate) fn new() -> Self {
Self {
replications: BTreeMap::new(),
next_heartbeat: <C::AsyncRuntime as AsyncRuntime>::Instant::now(),
next_heartbeat: InstantOf::<C>::now(),
}
}
}
Expand Down Expand Up @@ -216,10 +217,7 @@ where
SM: RaftStateMachine<C>,
{
/// The main loop of the Raft protocol.
pub(crate) async fn main(
mut self,
rx_shutdown: <C::AsyncRuntime as AsyncRuntime>::OneshotReceiver<()>,
) -> Result<(), Fatal<C::NodeId>> {
pub(crate) async fn main(mut self, rx_shutdown: OneshotReceiverOf<C, ()>) -> Result<(), Fatal<C::NodeId>> {
let span = tracing::span!(parent: &self.span, Level::DEBUG, "main");
let res = self.do_main(rx_shutdown).instrument(span).await;

Expand All @@ -243,10 +241,7 @@ where
}

#[tracing::instrument(level="trace", skip_all, fields(id=display(self.id), cluster=%self.config.cluster_name))]
async fn do_main(
&mut self,
rx_shutdown: <C::AsyncRuntime as AsyncRuntime>::OneshotReceiver<()>,
) -> Result<(), Fatal<C::NodeId>> {
async fn do_main(&mut self, rx_shutdown: OneshotReceiverOf<C, ()>) -> Result<(), Fatal<C::NodeId>> {
tracing::debug!("raft node is initializing");

self.engine.startup();
Expand Down Expand Up @@ -490,18 +485,15 @@ where
/// Currently heartbeat is a blank log
#[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))]
pub fn send_heartbeat(&mut self, emitter: impl Display) -> bool {
tracing::debug!(
now = debug(<C::AsyncRuntime as AsyncRuntime>::Instant::now()),
"send_heartbeat"
);
tracing::debug!(now = debug(InstantOf::<C>::now()), "send_heartbeat");

let mut lh = if let Some((lh, _)) =
self.engine.get_leader_handler_or_reject::<(), ClientWriteError<C::NodeId, C::Node>>(None)
{
lh
} else {
tracing::debug!(
now = debug(<C::AsyncRuntime as AsyncRuntime>::Instant::now()),
now = debug(InstantOf::<C>::now()),
"{} failed to send heartbeat",
emitter
);
Expand Down Expand Up @@ -889,10 +881,7 @@ where

/// Run an event handling loop
#[tracing::instrument(level="debug", skip_all, fields(id=display(self.id)))]
async fn runtime_loop(
&mut self,
mut rx_shutdown: <C::AsyncRuntime as AsyncRuntime>::OneshotReceiver<()>,
) -> Result<(), Fatal<C::NodeId>> {
async fn runtime_loop(&mut self, mut rx_shutdown: OneshotReceiverOf<C, ()>) -> Result<(), Fatal<C::NodeId>> {
// Ratio control the ratio of number of RaftMsg to process to number of Notify to process.
let mut balancer = Balancer::new(10_000);

Expand Down Expand Up @@ -1125,7 +1114,7 @@ where
self.handle_append_entries_request(rpc, tx);
}
RaftMsg::RequestVote { rpc, tx } => {
let now = <C::AsyncRuntime as AsyncRuntime>::Instant::now();
let now = InstantOf::<C>::now();
tracing::info!(
now = debug(now),
vote_request = display(rpc.summary()),
Expand Down Expand Up @@ -1216,7 +1205,7 @@ where
resp,
sender_vote: vote,
} => {
let now = <C::AsyncRuntime as AsyncRuntime>::Instant::now();
let now = InstantOf::<C>::now();

tracing::info!(
now = debug(now),
Expand Down Expand Up @@ -1252,7 +1241,7 @@ where
Notify::Tick { i } => {
// check every timer

let now = <C::AsyncRuntime as AsyncRuntime>::Instant::now();
let now = InstantOf::<C>::now();
tracing::debug!("received tick: {}, now: {:?}", i, now);

self.handle_tick_election();
Expand All @@ -1269,8 +1258,8 @@ where

// Install next heartbeat
if let Some(l) = &mut self.leader_data {
l.next_heartbeat = <C::AsyncRuntime as AsyncRuntime>::Instant::now()
+ Duration::from_millis(self.config.heartbeat_interval);
l.next_heartbeat =
InstantOf::<C>::now() + Duration::from_millis(self.config.heartbeat_interval);
}
}
}
Expand Down Expand Up @@ -1405,7 +1394,7 @@ where

#[tracing::instrument(level = "debug", skip_all)]
fn handle_tick_election(&mut self) {
let now = <C::AsyncRuntime as AsyncRuntime>::Instant::now();
let now = InstantOf::<C>::now();

tracing::debug!("try to trigger election by tick, now: {:?}", now);

Expand Down
6 changes: 2 additions & 4 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ use crate::raft::ClientWriteResponse;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::NodeIdOf;
use crate::type_config::alias::NodeOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::AsyncRuntime;
use crate::ChangeMembers;
use crate::MessageSummary;
use crate::RaftTypeConfig;
Expand All @@ -30,8 +29,7 @@ pub(crate) mod external_command;
/// A oneshot TX to send result from `RaftCore` to external caller, e.g. `Raft::append_entries`.
pub(crate) type ResultSender<C, T, E = Infallible> = OneshotSenderOf<C, Result<T, E>>;

pub(crate) type ResultReceiver<C, T, E = Infallible> =
<AsyncRuntimeOf<C> as AsyncRuntime>::OneshotReceiver<Result<T, E>>;
pub(crate) type ResultReceiver<C, T, E = Infallible> = OneshotReceiverOf<C, Result<T, E>>;

/// TX for Vote Response
pub(crate) type VoteTx<C> = ResultSender<C, VoteResponse<NodeIdOf<C>>>;
Expand Down
5 changes: 3 additions & 2 deletions openraft/src/core/sm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ pub(crate) use response::Response;

use crate::core::notify::Notify;
use crate::core::raft_msg::ResultSender;
use crate::type_config::alias::JoinHandleOf;

/// State machine worker handle for sending command to it.
pub(crate) struct Handle<C>
where C: RaftTypeConfig
{
cmd_tx: mpsc::UnboundedSender<Command<C>>,
#[allow(dead_code)]
join_handle: <C::AsyncRuntime as AsyncRuntime>::JoinHandle<()>,
join_handle: JoinHandleOf<C, ()>,
}

impl<C> Handle<C>
Expand Down Expand Up @@ -81,7 +82,7 @@ where
Handle { cmd_tx, join_handle }
}

fn do_spawn(mut self) -> <C::AsyncRuntime as AsyncRuntime>::JoinHandle<()> {
fn do_spawn(mut self) -> JoinHandleOf<C, ()> {
C::AsyncRuntime::spawn(async move {
let res = self.worker_loop().await;

Expand Down
15 changes: 6 additions & 9 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ use crate::raft::VoteResponse;
use crate::raft_state::LogStateReader;
use crate::raft_state::RaftState;
use crate::summary::MessageSummary;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::AsyncRuntime;
use crate::Instant;
use crate::LogId;
use crate::LogIdOptionExt;
Expand Down Expand Up @@ -67,7 +67,7 @@ where C: RaftTypeConfig
pub(crate) config: EngineConfig<C::NodeId>,

/// The state of this raft node.
pub(crate) state: Valid<RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>>,
pub(crate) state: Valid<RaftState<C::NodeId, C::Node, InstantOf<C>>>,

// TODO: add a Voting state as a container.
/// Whether a greater log id is seen during election.
Expand All @@ -77,7 +77,7 @@ where C: RaftTypeConfig
pub(crate) seen_greater_log: bool,

/// The internal server state used by Engine.
pub(crate) internal_server_state: InternalServerState<C::NodeId, <C::AsyncRuntime as AsyncRuntime>::Instant>,
pub(crate) internal_server_state: InternalServerState<C::NodeId, InstantOf<C>>,

/// Output entry for the runtime.
pub(crate) output: EngineOutput<C>,
Expand All @@ -87,7 +87,7 @@ impl<C> Engine<C>
where C: RaftTypeConfig
{
pub(crate) fn new(
init_state: RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>,
init_state: RaftState<C::NodeId, C::Node, InstantOf<C>>,
config: EngineConfig<C::NodeId>,
) -> Self {
Self {
Expand Down Expand Up @@ -193,10 +193,7 @@ where C: RaftTypeConfig

// Safe unwrap(): leading state is just created
let leading = self.internal_server_state.leading_mut().unwrap();
let voting = leading.initialize_voting(
self.state.last_log_id().copied(),
<C::AsyncRuntime as AsyncRuntime>::Instant::now(),
);
let voting = leading.initialize_voting(self.state.last_log_id().copied(), InstantOf::<C>::now());

let quorum_granted = voting.grant_by(&self.config.id);

Expand Down Expand Up @@ -248,7 +245,7 @@ where C: RaftTypeConfig

#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_vote_req(&mut self, req: VoteRequest<C::NodeId>) -> VoteResponse<C::NodeId> {
let now = <C::AsyncRuntime as AsyncRuntime>::Instant::now();
let now = InstantOf::<C>::now();
let lease = self.config.timer_config.leader_lease;
let vote = self.state.vote_ref();

Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/handler/following_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::engine::EngineOutput;
use crate::entry::RaftPayload;
use crate::error::RejectAppendEntries;
use crate::raft_state::LogStateReader;
use crate::AsyncRuntime;
use crate::type_config::alias::InstantOf;
use crate::EffectiveMembership;
use crate::LogId;
use crate::LogIdOptionExt;
Expand All @@ -37,7 +37,7 @@ pub(crate) struct FollowingHandler<'x, C>
where C: RaftTypeConfig
{
pub(crate) config: &'x mut EngineConfig<C::NodeId>,
pub(crate) state: &'x mut RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>,
pub(crate) state: &'x mut RaftState<C::NodeId, C::Node, InstantOf<C>>,
pub(crate) output: &'x mut EngineOutput<C>,
}

Expand Down
7 changes: 3 additions & 4 deletions openraft/src/engine/handler/leader_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::engine::EngineOutput;
use crate::entry::RaftPayload;
use crate::internal_server_state::LeaderQuorumSet;
use crate::leader::Leading;
use crate::AsyncRuntime;
use crate::type_config::alias::InstantOf;
use crate::RaftLogId;
use crate::RaftState;
use crate::RaftTypeConfig;
Expand All @@ -24,9 +24,8 @@ pub(crate) struct LeaderHandler<'x, C>
where C: RaftTypeConfig
{
pub(crate) config: &'x mut EngineConfig<C::NodeId>,
pub(crate) leader:
&'x mut Leading<C::NodeId, LeaderQuorumSet<C::NodeId>, <C::AsyncRuntime as AsyncRuntime>::Instant>,
pub(crate) state: &'x mut RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>,
pub(crate) leader: &'x mut Leading<C::NodeId, LeaderQuorumSet<C::NodeId>, InstantOf<C>>,
pub(crate) state: &'x mut RaftState<C::NodeId, C::Node, InstantOf<C>>,
pub(crate) output: &'x mut EngineOutput<C>,
}

Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/handler/log_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::engine::EngineConfig;
use crate::engine::EngineOutput;
use crate::raft_state::LogStateReader;
use crate::summary::MessageSummary;
use crate::AsyncRuntime;
use crate::type_config::alias::InstantOf;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::RaftState;
Expand All @@ -17,7 +17,7 @@ pub(crate) struct LogHandler<'x, C>
where C: RaftTypeConfig
{
pub(crate) config: &'x mut EngineConfig<C::NodeId>,
pub(crate) state: &'x mut RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>,
pub(crate) state: &'x mut RaftState<C::NodeId, C::Node, InstantOf<C>>,
pub(crate) output: &'x mut EngineOutput<C>,
}

Expand Down
Loading

0 comments on commit 950de57

Please sign in to comment.