diff --git a/cluster_benchmark/tests/benchmark/store.rs b/cluster_benchmark/tests/benchmark/store.rs index fb3815e1a..294d588d1 100644 --- a/cluster_benchmark/tests/benchmark/store.rs +++ b/cluster_benchmark/tests/benchmark/store.rs @@ -278,9 +278,7 @@ impl RaftStateMachine for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot( - &mut self, - ) -> Result::SnapshotData>, StorageError> { + async fn begin_receiving_snapshot(&mut self) -> Result>, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } @@ -288,7 +286,7 @@ impl RaftStateMachine for Arc { async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box<::SnapshotData>, + snapshot: Box>, ) -> Result<(), StorageError> { let new_snapshot = StoredSnapshot { meta: meta.clone(), diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/store.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/store.rs index 6755de6fc..e802ab244 100644 --- a/examples/raft-kv-memstore-generic-snapshot-data/src/store.rs +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/store.rs @@ -169,9 +169,7 @@ impl RaftStateMachine for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot( - &mut self, - ) -> Result::SnapshotData>, StorageError> { + async fn begin_receiving_snapshot(&mut self) -> Result>, StorageError> { Ok(Box::default()) } @@ -179,7 +177,7 @@ impl RaftStateMachine for Arc { async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box<::SnapshotData>, + snapshot: Box>, ) -> Result<(), StorageError> { tracing::info!("install snapshot"); diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs index fee80b0f7..f27874e7d 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs @@ -190,9 +190,7 @@ impl RaftStateMachine for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot( - &mut self, - ) -> Result::SnapshotData>, StorageError> { + async fn begin_receiving_snapshot(&mut self) -> Result>, StorageError> { Ok(Box::default()) } @@ -200,7 +198,7 @@ impl RaftStateMachine for Arc { async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box<::SnapshotData>, + snapshot: Box>, ) -> Result<(), StorageError> { tracing::info!("install snapshot"); diff --git a/examples/raft-kv-memstore-singlethreaded/src/store.rs b/examples/raft-kv-memstore-singlethreaded/src/store.rs index 227fba770..0ec45f1dc 100644 --- a/examples/raft-kv-memstore-singlethreaded/src/store.rs +++ b/examples/raft-kv-memstore-singlethreaded/src/store.rs @@ -223,9 +223,7 @@ impl RaftStateMachine for Rc { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot( - &mut self, - ) -> Result::SnapshotData>, StorageError> { + async fn begin_receiving_snapshot(&mut self) -> Result>, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } @@ -233,7 +231,7 @@ impl RaftStateMachine for Rc { async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box<::SnapshotData>, + snapshot: Box>, ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, diff --git a/examples/raft-kv-memstore/src/store/mod.rs b/examples/raft-kv-memstore/src/store/mod.rs index b64ea4427..92af24358 100644 --- a/examples/raft-kv-memstore/src/store/mod.rs +++ b/examples/raft-kv-memstore/src/store/mod.rs @@ -176,9 +176,7 @@ impl RaftStateMachine for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot( - &mut self, - ) -> Result::SnapshotData>, StorageError> { + async fn begin_receiving_snapshot(&mut self) -> Result>, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } @@ -186,7 +184,7 @@ impl RaftStateMachine for Arc { async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box<::SnapshotData>, + snapshot: Box>, ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 32db67c38..1dc9e481b 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -84,6 +84,7 @@ use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::InstantOf; +use crate::type_config::alias::OneshotReceiverOf; use crate::utime::UTime; use crate::AsyncRuntime; use crate::ChangeMembers; @@ -141,14 +142,14 @@ pub(crate) struct LeaderData { pub(super) replications: BTreeMap>, /// The time to send next heartbeat. - pub(crate) next_heartbeat: ::Instant, + pub(crate) next_heartbeat: InstantOf, } impl LeaderData { pub(crate) fn new() -> Self { Self { replications: BTreeMap::new(), - next_heartbeat: ::Instant::now(), + next_heartbeat: InstantOf::::now(), } } } @@ -216,10 +217,7 @@ where SM: RaftStateMachine, { /// The main loop of the Raft protocol. - pub(crate) async fn main( - mut self, - rx_shutdown: ::OneshotReceiver<()>, - ) -> Result<(), Fatal> { + pub(crate) async fn main(mut self, rx_shutdown: OneshotReceiverOf) -> Result<(), Fatal> { let span = tracing::span!(parent: &self.span, Level::DEBUG, "main"); let res = self.do_main(rx_shutdown).instrument(span).await; @@ -243,10 +241,7 @@ where } #[tracing::instrument(level="trace", skip_all, fields(id=display(self.id), cluster=%self.config.cluster_name))] - async fn do_main( - &mut self, - rx_shutdown: ::OneshotReceiver<()>, - ) -> Result<(), Fatal> { + async fn do_main(&mut self, rx_shutdown: OneshotReceiverOf) -> Result<(), Fatal> { tracing::debug!("raft node is initializing"); self.engine.startup(); @@ -490,10 +485,7 @@ where /// Currently heartbeat is a blank log #[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))] pub fn send_heartbeat(&mut self, emitter: impl Display) -> bool { - tracing::debug!( - now = debug(::Instant::now()), - "send_heartbeat" - ); + tracing::debug!(now = debug(InstantOf::::now()), "send_heartbeat"); let mut lh = if let Some((lh, _)) = self.engine.get_leader_handler_or_reject::<(), ClientWriteError>(None) @@ -501,7 +493,7 @@ where lh } else { tracing::debug!( - now = debug(::Instant::now()), + now = debug(InstantOf::::now()), "{} failed to send heartbeat", emitter ); @@ -889,10 +881,7 @@ where /// Run an event handling loop #[tracing::instrument(level="debug", skip_all, fields(id=display(self.id)))] - async fn runtime_loop( - &mut self, - mut rx_shutdown: ::OneshotReceiver<()>, - ) -> Result<(), Fatal> { + async fn runtime_loop(&mut self, mut rx_shutdown: OneshotReceiverOf) -> Result<(), Fatal> { // Ratio control the ratio of number of RaftMsg to process to number of Notify to process. let mut balancer = Balancer::new(10_000); @@ -1125,7 +1114,7 @@ where self.handle_append_entries_request(rpc, tx); } RaftMsg::RequestVote { rpc, tx } => { - let now = ::Instant::now(); + let now = InstantOf::::now(); tracing::info!( now = debug(now), vote_request = display(rpc.summary()), @@ -1216,7 +1205,7 @@ where resp, sender_vote: vote, } => { - let now = ::Instant::now(); + let now = InstantOf::::now(); tracing::info!( now = debug(now), @@ -1252,7 +1241,7 @@ where Notify::Tick { i } => { // check every timer - let now = ::Instant::now(); + let now = InstantOf::::now(); tracing::debug!("received tick: {}, now: {:?}", i, now); self.handle_tick_election(); @@ -1269,8 +1258,8 @@ where // Install next heartbeat if let Some(l) = &mut self.leader_data { - l.next_heartbeat = ::Instant::now() - + Duration::from_millis(self.config.heartbeat_interval); + l.next_heartbeat = + InstantOf::::now() + Duration::from_millis(self.config.heartbeat_interval); } } } @@ -1405,7 +1394,7 @@ where #[tracing::instrument(level = "debug", skip_all)] fn handle_tick_election(&mut self) { - let now = ::Instant::now(); + let now = InstantOf::::now(); tracing::debug!("try to trigger election by tick, now: {:?}", now); diff --git a/openraft/src/core/raft_msg/mod.rs b/openraft/src/core/raft_msg/mod.rs index ecffbda28..1b2c5d66d 100644 --- a/openraft/src/core/raft_msg/mod.rs +++ b/openraft/src/core/raft_msg/mod.rs @@ -12,13 +12,12 @@ use crate::raft::ClientWriteResponse; use crate::raft::SnapshotResponse; use crate::raft::VoteRequest; use crate::raft::VoteResponse; -use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::LogIdOf; use crate::type_config::alias::NodeIdOf; use crate::type_config::alias::NodeOf; +use crate::type_config::alias::OneshotReceiverOf; use crate::type_config::alias::OneshotSenderOf; use crate::type_config::alias::SnapshotDataOf; -use crate::AsyncRuntime; use crate::ChangeMembers; use crate::MessageSummary; use crate::RaftTypeConfig; @@ -30,8 +29,7 @@ pub(crate) mod external_command; /// A oneshot TX to send result from `RaftCore` to external caller, e.g. `Raft::append_entries`. pub(crate) type ResultSender = OneshotSenderOf>; -pub(crate) type ResultReceiver = - as AsyncRuntime>::OneshotReceiver>; +pub(crate) type ResultReceiver = OneshotReceiverOf>; /// TX for Vote Response pub(crate) type VoteTx = ResultSender>>; diff --git a/openraft/src/core/sm/mod.rs b/openraft/src/core/sm/mod.rs index af810f6c0..871d56425 100644 --- a/openraft/src/core/sm/mod.rs +++ b/openraft/src/core/sm/mod.rs @@ -30,6 +30,7 @@ pub(crate) use response::Response; use crate::core::notify::Notify; use crate::core::raft_msg::ResultSender; +use crate::type_config::alias::JoinHandleOf; /// State machine worker handle for sending command to it. pub(crate) struct Handle @@ -37,7 +38,7 @@ where C: RaftTypeConfig { cmd_tx: mpsc::UnboundedSender>, #[allow(dead_code)] - join_handle: ::JoinHandle<()>, + join_handle: JoinHandleOf, } impl Handle @@ -81,7 +82,7 @@ where Handle { cmd_tx, join_handle } } - fn do_spawn(mut self) -> ::JoinHandle<()> { + fn do_spawn(mut self) -> JoinHandleOf { C::AsyncRuntime::spawn(async move { let res = self.worker_loop().await; diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index d9cf0fb09..ce03003cd 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -38,8 +38,8 @@ use crate::raft::VoteResponse; use crate::raft_state::LogStateReader; use crate::raft_state::RaftState; use crate::summary::MessageSummary; +use crate::type_config::alias::InstantOf; use crate::type_config::alias::SnapshotDataOf; -use crate::AsyncRuntime; use crate::Instant; use crate::LogId; use crate::LogIdOptionExt; @@ -67,7 +67,7 @@ where C: RaftTypeConfig pub(crate) config: EngineConfig, /// The state of this raft node. - pub(crate) state: Valid::Instant>>, + pub(crate) state: Valid>>, // TODO: add a Voting state as a container. /// Whether a greater log id is seen during election. @@ -77,7 +77,7 @@ where C: RaftTypeConfig pub(crate) seen_greater_log: bool, /// The internal server state used by Engine. - pub(crate) internal_server_state: InternalServerState::Instant>, + pub(crate) internal_server_state: InternalServerState>, /// Output entry for the runtime. pub(crate) output: EngineOutput, @@ -87,7 +87,7 @@ impl Engine where C: RaftTypeConfig { pub(crate) fn new( - init_state: RaftState::Instant>, + init_state: RaftState>, config: EngineConfig, ) -> Self { Self { @@ -193,10 +193,7 @@ where C: RaftTypeConfig // Safe unwrap(): leading state is just created let leading = self.internal_server_state.leading_mut().unwrap(); - let voting = leading.initialize_voting( - self.state.last_log_id().copied(), - ::Instant::now(), - ); + let voting = leading.initialize_voting(self.state.last_log_id().copied(), InstantOf::::now()); let quorum_granted = voting.grant_by(&self.config.id); @@ -248,7 +245,7 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn handle_vote_req(&mut self, req: VoteRequest) -> VoteResponse { - let now = ::Instant::now(); + let now = InstantOf::::now(); let lease = self.config.timer_config.leader_lease; let vote = self.state.vote_ref(); diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index 8780dd77b..9acac21af 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -12,7 +12,7 @@ use crate::engine::EngineOutput; use crate::entry::RaftPayload; use crate::error::RejectAppendEntries; use crate::raft_state::LogStateReader; -use crate::AsyncRuntime; +use crate::type_config::alias::InstantOf; use crate::EffectiveMembership; use crate::LogId; use crate::LogIdOptionExt; @@ -37,7 +37,7 @@ pub(crate) struct FollowingHandler<'x, C> where C: RaftTypeConfig { pub(crate) config: &'x mut EngineConfig, - pub(crate) state: &'x mut RaftState::Instant>, + pub(crate) state: &'x mut RaftState>, pub(crate) output: &'x mut EngineOutput, } diff --git a/openraft/src/engine/handler/leader_handler/mod.rs b/openraft/src/engine/handler/leader_handler/mod.rs index 61a50188d..b3ba6fc9e 100644 --- a/openraft/src/engine/handler/leader_handler/mod.rs +++ b/openraft/src/engine/handler/leader_handler/mod.rs @@ -7,7 +7,7 @@ use crate::engine::EngineOutput; use crate::entry::RaftPayload; use crate::internal_server_state::LeaderQuorumSet; use crate::leader::Leading; -use crate::AsyncRuntime; +use crate::type_config::alias::InstantOf; use crate::RaftLogId; use crate::RaftState; use crate::RaftTypeConfig; @@ -24,9 +24,8 @@ pub(crate) struct LeaderHandler<'x, C> where C: RaftTypeConfig { pub(crate) config: &'x mut EngineConfig, - pub(crate) leader: - &'x mut Leading, ::Instant>, - pub(crate) state: &'x mut RaftState::Instant>, + pub(crate) leader: &'x mut Leading, InstantOf>, + pub(crate) state: &'x mut RaftState>, pub(crate) output: &'x mut EngineOutput, } diff --git a/openraft/src/engine/handler/log_handler/mod.rs b/openraft/src/engine/handler/log_handler/mod.rs index 93eedf78a..aa1778d12 100644 --- a/openraft/src/engine/handler/log_handler/mod.rs +++ b/openraft/src/engine/handler/log_handler/mod.rs @@ -3,7 +3,7 @@ use crate::engine::EngineConfig; use crate::engine::EngineOutput; use crate::raft_state::LogStateReader; use crate::summary::MessageSummary; -use crate::AsyncRuntime; +use crate::type_config::alias::InstantOf; use crate::LogId; use crate::LogIdOptionExt; use crate::RaftState; @@ -17,7 +17,7 @@ pub(crate) struct LogHandler<'x, C> where C: RaftTypeConfig { pub(crate) config: &'x mut EngineConfig, - pub(crate) state: &'x mut RaftState::Instant>, + pub(crate) state: &'x mut RaftState>, pub(crate) output: &'x mut EngineOutput, } diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index fbf52e396..2423e7254 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -15,8 +15,8 @@ use crate::progress::Progress; use crate::raft_state::LogStateReader; use crate::replication::request_id::RequestId; use crate::replication::response::ReplicationResult; +use crate::type_config::alias::InstantOf; use crate::utime::UTime; -use crate::AsyncRuntime; use crate::EffectiveMembership; use crate::LogId; use crate::LogIdOptionExt; @@ -41,9 +41,8 @@ pub(crate) struct ReplicationHandler<'x, C> where C: RaftTypeConfig { pub(crate) config: &'x mut EngineConfig, - pub(crate) leader: - &'x mut Leading, ::Instant>, - pub(crate) state: &'x mut RaftState::Instant>, + pub(crate) leader: &'x mut Leading, InstantOf>, + pub(crate) state: &'x mut RaftState>, pub(crate) output: &'x mut EngineOutput, } @@ -141,7 +140,7 @@ where C: RaftTypeConfig &mut self, target: C::NodeId, request_id: RequestId, - result: UTime, ::Instant>, + result: UTime, InstantOf>, ) { let sending_time = result.utime().unwrap(); @@ -167,11 +166,7 @@ where C: RaftTypeConfig /// Update progress when replicated data(logs or snapshot) matches on follower/learner and is /// accepted. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn update_leader_vote_clock( - &mut self, - node_id: C::NodeId, - t: ::Instant, - ) { + pub(crate) fn update_leader_vote_clock(&mut self, node_id: C::NodeId, t: InstantOf) { tracing::debug!(target = display(node_id), t = debug(t), "{}", func_name!()); let granted = *self @@ -286,7 +281,7 @@ where C: RaftTypeConfig &mut self, target: C::NodeId, request_id: RequestId, - repl_res: Result, ::Instant>, String>, + repl_res: Result, InstantOf>, String>, ) { // TODO(2): test diff --git a/openraft/src/engine/handler/server_state_handler/mod.rs b/openraft/src/engine/handler/server_state_handler/mod.rs index a604c99ed..c9ef3a92d 100644 --- a/openraft/src/engine/handler/server_state_handler/mod.rs +++ b/openraft/src/engine/handler/server_state_handler/mod.rs @@ -1,7 +1,7 @@ use crate::engine::Command; use crate::engine::EngineConfig; use crate::engine::EngineOutput; -use crate::AsyncRuntime; +use crate::type_config::alias::InstantOf; use crate::RaftState; use crate::RaftTypeConfig; use crate::ServerState; @@ -13,7 +13,7 @@ pub(crate) struct ServerStateHandler<'st, C> where C: RaftTypeConfig { pub(crate) config: &'st EngineConfig, - pub(crate) state: &'st mut RaftState::Instant>, + pub(crate) state: &'st mut RaftState>, pub(crate) output: &'st mut EngineOutput, } diff --git a/openraft/src/engine/handler/snapshot_handler/mod.rs b/openraft/src/engine/handler/snapshot_handler/mod.rs index 250e78d39..138e8e098 100644 --- a/openraft/src/engine/handler/snapshot_handler/mod.rs +++ b/openraft/src/engine/handler/snapshot_handler/mod.rs @@ -5,7 +5,7 @@ use crate::engine::Command; use crate::engine::EngineOutput; use crate::raft_state::LogStateReader; use crate::summary::MessageSummary; -use crate::AsyncRuntime; +use crate::type_config::alias::InstantOf; use crate::RaftState; use crate::RaftTypeConfig; use crate::SnapshotMeta; @@ -17,7 +17,7 @@ use crate::SnapshotMeta; pub(crate) struct SnapshotHandler<'st, 'out, C> where C: RaftTypeConfig { - pub(crate) state: &'st mut RaftState::Instant>, + pub(crate) state: &'st mut RaftState>, pub(crate) output: &'out mut EngineOutput, } diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index aa6946345..de61184bc 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -12,7 +12,6 @@ use crate::internal_server_state::InternalServerState; use crate::leader::Leading; use crate::raft_state::LogStateReader; use crate::type_config::alias::InstantOf; -use crate::AsyncRuntime; use crate::Instant; use crate::OptionalSend; use crate::RaftState; @@ -30,10 +29,9 @@ pub(crate) struct VoteHandler<'st, C> where C: RaftTypeConfig { pub(crate) config: &'st EngineConfig, - pub(crate) state: &'st mut RaftState::Instant>, + pub(crate) state: &'st mut RaftState>, pub(crate) output: &'st mut EngineOutput, - pub(crate) internal_server_state: - &'st mut InternalServerState::Instant>, + pub(crate) internal_server_state: &'st mut InternalServerState>, } impl<'st, C> VoteHandler<'st, C> @@ -102,19 +100,15 @@ where C: RaftTypeConfig if vote > self.state.vote_ref() { tracing::info!("vote is changing from {} to {}", self.state.vote_ref(), vote); - self.state.vote.update(::Instant::now(), *vote); + self.state.vote.update(InstantOf::::now(), *vote); self.output.push_command(Command::SaveVote { vote: *vote }); } else { - self.state.vote.touch(::Instant::now()); + self.state.vote.touch(InstantOf::::now()); } // Update vote related timer and lease. - tracing::debug!( - now = debug(::Instant::now()), - "{}", - func_name!() - ); + tracing::debug!(now = debug(InstantOf::::now()), "{}", func_name!()); self.update_internal_server_state(); diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 7b6d61632..61393e69d 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -64,6 +64,8 @@ use crate::raft::trigger::Trigger; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; use crate::type_config::alias::AsyncRuntimeOf; +use crate::type_config::alias::InstantOf; +use crate::type_config::alias::JoinErrorOf; use crate::type_config::alias::SnapshotDataOf; use crate::AsyncRuntime; use crate::ChangeMembers; @@ -809,9 +811,7 @@ where C: RaftTypeConfig /// ``` pub async fn with_raft_state(&self, func: F) -> Result> where - F: FnOnce(&RaftState::Instant>) -> V - + OptionalSend - + 'static, + F: FnOnce(&RaftState>) -> V + OptionalSend + 'static, V: OptionalSend + 'static, { let (tx, rx) = C::AsyncRuntime::oneshot(); @@ -848,8 +848,7 @@ where C: RaftTypeConfig /// If the API channel is already closed (Raft is in shutdown), then the request functor is /// destroyed right away and not called at all. pub fn external_request(&self, req: F) - where F: FnOnce(&RaftState::Instant>) + OptionalSend + 'static - { + where F: FnOnce(&RaftState>) + OptionalSend + 'static { let req: BoxCoreFn = Box::new(req); let _ignore_error = self.inner.tx_api.send(RaftMsg::ExternalCoreRequest { req }); } @@ -903,7 +902,7 @@ where C: RaftTypeConfig /// Shutdown this Raft node. /// /// It sends a shutdown signal and waits until `RaftCore` returns. - pub async fn shutdown(&self) -> Result<(), ::JoinError> { + pub async fn shutdown(&self) -> Result<(), JoinErrorOf> { if let Some(tx) = self.inner.tx_shutdown.lock().await.take() { // A failure to send means the RaftCore is already shutdown. Continue to check the task // return value. diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index ed286b5ab..7d8146540 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -16,6 +16,7 @@ use crate::error::RaftError; use crate::metrics::RaftDataMetrics; use crate::metrics::RaftServerMetrics; use crate::raft::core_state::CoreState; +use crate::type_config::alias::OneshotReceiverOf; use crate::type_config::alias::OneshotSenderOf; use crate::AsyncRuntime; use crate::Config; @@ -55,7 +56,7 @@ where C: RaftTypeConfig pub(crate) async fn call_core( &self, mes: RaftMsg, - rx: ::OneshotReceiver>, + rx: OneshotReceiverOf>, ) -> Result> where E: Debug + OptionalSend, diff --git a/openraft/src/replication/response.rs b/openraft/src/replication/response.rs index 627eeb500..2788aafde 100644 --- a/openraft/src/replication/response.rs +++ b/openraft/src/replication/response.rs @@ -1,7 +1,7 @@ use crate::replication::request_id::RequestId; use crate::replication::ReplicationSessionId; +use crate::type_config::alias::InstantOf; use crate::utime::UTime; -use crate::AsyncRuntime; use crate::LogId; use crate::MessageSummary; use crate::NodeId; @@ -33,7 +33,7 @@ where C: RaftTypeConfig /// the target node. /// /// The result also track the time when this request is sent. - result: Result, ::Instant>, String>, + result: Result, InstantOf>, String>, /// In which session this message is sent. /// diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index 0d04c2c1a..bc1822e81 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -10,8 +10,8 @@ use crate::raft_state::LogIOId; use crate::storage::RaftLogReaderExt; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; +use crate::type_config::alias::InstantOf; use crate::utime::UTime; -use crate::AsyncRuntime; use crate::EffectiveMembership; use crate::Instant; use crate::LogIdOptionExt; @@ -61,8 +61,7 @@ where /// state from stable storage. pub async fn get_initial_state( &mut self, - ) -> Result::Instant>, StorageError> - { + ) -> Result>, StorageError> { let vote = self.log_store.read_vote().await?; let vote = vote.unwrap_or_default(); @@ -152,7 +151,7 @@ where last_purged_log_id, ); - let now = ::Instant::now(); + let now = InstantOf::::now(); Ok(RaftState { committed: last_applied, diff --git a/openraft/src/testing/suite.rs b/openraft/src/testing/suite.rs index 5038256cb..f19b17c58 100644 --- a/openraft/src/testing/suite.rs +++ b/openraft/src/testing/suite.rs @@ -19,6 +19,8 @@ use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; use crate::storage::StorageHelper; use crate::testing::StoreBuilder; +use crate::type_config::alias::AsyncRuntimeOf; +use crate::type_config::alias::InstantOf; use crate::vote::CommittedLeaderId; use crate::AsyncRuntime; use crate::LogId; @@ -341,7 +343,7 @@ where pub async fn get_initial_state_without_init(mut store: LS, mut sm: SM) -> Result<(), StorageError> { let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?; - let mut want = RaftState::::Instant>::default(); + let mut want = RaftState::>::default(); want.vote.update(initial.vote.utime().unwrap(), Vote::default()); assert_eq!(want, initial, "uninitialized state"); @@ -1170,7 +1172,7 @@ where let entries = entries.into_iter().collect::>(); let last_log_id = *entries.last().unwrap().get_log_id(); - let (tx, rx) = ::oneshot(); + let (tx, rx) = AsyncRuntimeOf::::oneshot(); let cb = LogFlushed::new(Some(last_log_id), tx); diff --git a/stores/memstore/Cargo.toml b/stores/memstore/Cargo.toml index dd8c4f414..37e4f072f 100644 --- a/stores/memstore/Cargo.toml +++ b/stores/memstore/Cargo.toml @@ -14,7 +14,7 @@ license = { workspace = true } repository = { workspace = true } [dependencies] -openraft = { path= "../../openraft", version = "0.10.0", features=["serde"] } +openraft = { path= "../../openraft", version = "0.10.0", features=["serde", "type-alias"] } serde = { workspace = true } serde_json = { workspace = true } diff --git a/stores/memstore/src/lib.rs b/stores/memstore/src/lib.rs index 4c0e1d696..a0d06f4ba 100644 --- a/stores/memstore/src/lib.rs +++ b/stores/memstore/src/lib.rs @@ -11,6 +11,7 @@ use std::ops::RangeBounds; use std::sync::Arc; use std::sync::Mutex; +use openraft::alias::SnapshotDataOf; use openraft::storage::LogFlushed; use openraft::storage::LogState; use openraft::storage::RaftLogReader; @@ -23,7 +24,6 @@ use openraft::EntryPayload; use openraft::LogId; use openraft::OptionalSend; use openraft::RaftLogId; -use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; @@ -477,9 +477,7 @@ impl RaftStateMachine for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot( - &mut self, - ) -> Result::SnapshotData>, StorageError> { + async fn begin_receiving_snapshot(&mut self) -> Result>, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } @@ -487,7 +485,7 @@ impl RaftStateMachine for Arc { async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box<::SnapshotData>, + snapshot: Box>, ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, diff --git a/stores/rocksstore/src/lib.rs b/stores/rocksstore/src/lib.rs index 67b5084e3..324e6565e 100644 --- a/stores/rocksstore/src/lib.rs +++ b/stores/rocksstore/src/lib.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use byteorder::BigEndian; use byteorder::ReadBytesExt; use byteorder::WriteBytesExt; +use openraft::alias::SnapshotDataOf; use openraft::storage::LogFlushed; use openraft::storage::LogState; use openraft::storage::RaftLogStorage; @@ -33,7 +34,6 @@ use openraft::OptionalSend; use openraft::RaftLogId; use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; -use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; @@ -453,16 +453,14 @@ impl RaftStateMachine for RocksStateMachine { self.clone() } - async fn begin_receiving_snapshot( - &mut self, - ) -> Result::SnapshotData>, StorageError> { + async fn begin_receiving_snapshot(&mut self) -> Result>, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box<::SnapshotData>, + snapshot: Box>, ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, diff --git a/stores/sledstore/src/lib.rs b/stores/sledstore/src/lib.rs index a2b39935e..d0129b60f 100644 --- a/stores/sledstore/src/lib.rs +++ b/stores/sledstore/src/lib.rs @@ -14,6 +14,7 @@ use async_std::sync::RwLock; use byteorder::BigEndian; use byteorder::ByteOrder; use byteorder::ReadBytesExt; +use openraft::alias::SnapshotDataOf; use openraft::storage::LogFlushed; use openraft::storage::LogState; use openraft::storage::RaftLogStorage; @@ -28,7 +29,6 @@ use openraft::OptionalSend; use openraft::RaftLogId; use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; -use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; @@ -634,7 +634,7 @@ impl RaftStateMachine for Arc { #[tracing::instrument(level = "trace", skip(self))] async fn begin_receiving_snapshot( &mut self, - ) -> Result::SnapshotData>, StorageError> { + ) -> Result>, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } @@ -642,7 +642,7 @@ impl RaftStateMachine for Arc { async fn install_snapshot( &mut self, meta: &SnapshotMeta, - snapshot: Box<::SnapshotData>, + snapshot: Box>, ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 17020f476..1615a2226 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -18,7 +18,7 @@ repository = { workspace = true } [dependencies] [dev-dependencies] -openraft = { path="../openraft", version = "0.10.0" } +openraft = { path="../openraft", version = "0.10.0", features=["type-alias"] } openraft-memstore = { path= "../stores/memstore" } anyerror = { workspace = true } diff --git a/tests/tests/append_entries/t60_enable_heartbeat.rs b/tests/tests/append_entries/t60_enable_heartbeat.rs index 319661300..f3c9a75eb 100644 --- a/tests/tests/append_entries/t60_enable_heartbeat.rs +++ b/tests/tests/append_entries/t60_enable_heartbeat.rs @@ -3,9 +3,11 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::alias::InstantOf; use openraft::AsyncRuntime; use openraft::Config; use openraft::TokioRuntime; +use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; use crate::fixtures::RaftRouter; @@ -30,7 +32,7 @@ async fn enable_heartbeat() -> Result<()> { node0.runtime_config().heartbeat(true); for _i in 0..3 { - let now = ::Instant::now(); + let now = InstantOf::::now(); TokioRuntime::sleep(Duration::from_millis(500)).await; for node_id in [1, 2, 3] { diff --git a/tests/tests/metrics/t10_leader_last_ack.rs b/tests/tests/metrics/t10_leader_last_ack.rs index 4d98b2bf5..1abe2b907 100644 --- a/tests/tests/metrics/t10_leader_last_ack.rs +++ b/tests/tests/metrics/t10_leader_last_ack.rs @@ -3,9 +3,9 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::alias::AsyncRuntimeOf; use openraft::AsyncRuntime; use openraft::Config; -use openraft::RaftTypeConfig; use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing; @@ -41,7 +41,7 @@ async fn leader_last_ack_3_nodes() -> Result<()> { tracing::info!(log_index, "--- sleep 500 ms, the `millis` should extend"); { - <::AsyncRuntime as AsyncRuntime>::sleep(Duration::from_millis(500)).await; + AsyncRuntimeOf::::sleep(Duration::from_millis(500)).await; let greater = n0.metrics().borrow().millis_since_quorum_ack; println!("greater: {:?}", greater); @@ -70,7 +70,7 @@ async fn leader_last_ack_3_nodes() -> Result<()> { "--- sleep and heartbeat again; millis_since_quorum_ack refreshes" ); { - <::AsyncRuntime as AsyncRuntime>::sleep(Duration::from_millis(500)).await; + AsyncRuntimeOf::::sleep(Duration::from_millis(500)).await; n0.trigger().heartbeat().await?; @@ -93,7 +93,7 @@ async fn leader_last_ack_3_nodes() -> Result<()> { "--- sleep and heartbeat again; millis_since_quorum_ack does not refresh" ); { - <::AsyncRuntime as AsyncRuntime>::sleep(Duration::from_millis(500)).await; + AsyncRuntimeOf::::sleep(Duration::from_millis(500)).await; n0.trigger().heartbeat().await?;