diff --git a/examples/raft-kv-rocksdb/Cargo.toml b/examples/raft-kv-rocksdb/Cargo.toml index 94f442466..4133295a4 100644 --- a/examples/raft-kv-rocksdb/Cargo.toml +++ b/examples/raft-kv-rocksdb/Cargo.toml @@ -21,7 +21,7 @@ name = "raft-key-value-rocks" path = "src/bin/main.rs" [dependencies] -openraft = { path = "../../openraft", features = ["serde"] } +openraft = { path = "../../openraft", features = ["serde", "storage-v2"] } async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } async-trait = "0.1.36" diff --git a/examples/raft-kv-rocksdb/src/app.rs b/examples/raft-kv-rocksdb/src/app.rs index 87925531e..84cafd764 100644 --- a/examples/raft-kv-rocksdb/src/app.rs +++ b/examples/raft-kv-rocksdb/src/app.rs @@ -1,10 +1,11 @@ +use std::collections::BTreeMap; use std::sync::Arc; +use async_std::sync::RwLock; use openraft::Config; use crate::ExampleRaft; use crate::NodeId; -use crate::Store; // Representation of an application state. This struct can be shared around to share // instances of raft, store and more. @@ -13,6 +14,6 @@ pub struct App { pub api_addr: String, pub rcp_addr: String, pub raft: ExampleRaft, - pub store: Arc, + pub key_values: Arc>>, pub config: Arc, } diff --git a/examples/raft-kv-rocksdb/src/client.rs b/examples/raft-kv-rocksdb/src/client.rs index 485d36abd..92feb7646 100644 --- a/examples/raft-kv-rocksdb/src/client.rs +++ b/examples/raft-kv-rocksdb/src/client.rs @@ -2,15 +2,9 @@ use std::collections::BTreeSet; use std::sync::Arc; use std::sync::Mutex; -use openraft::error::CheckIsLeaderError; -use openraft::error::ClientWriteError; -use openraft::error::ForwardToLeader; -use openraft::error::InitializeError; use openraft::error::NetworkError; use openraft::error::RPCError; -use openraft::error::RaftError; use openraft::error::RemoteError; -use openraft::raft::ClientWriteResponse; use openraft::RaftMetrics; use openraft::TryAsRef; use reqwest::Client; @@ -18,10 +12,10 @@ use serde::de::DeserializeOwned; use serde::Deserialize; use serde::Serialize; +use crate::typ; use crate::Node; use crate::NodeId; use crate::Request; -use crate::TypeConfig; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Empty {} @@ -52,30 +46,21 @@ impl ExampleClient { /// will be applied to state machine. /// /// The result of applying the request will be returned. - pub async fn write( - &self, - req: &Request, - ) -> Result< - ClientWriteResponse, - RPCError>>, - > { + pub async fn write(&self, req: &Request) -> Result> { self.send_rpc_to_leader("api/write", Some(req)).await } /// Read value by key, in an inconsistent mode. /// /// This method may return stale value because it does not force to read on a legal leader. - pub async fn read(&self, req: &String) -> Result>> { + pub async fn read(&self, req: &String) -> Result { self.do_send_rpc_to_leader("api/read", Some(req)).await } /// Consistent Read value by key, in an inconsistent mode. /// /// This method MUST return consistent value or CheckIsLeaderError. - pub async fn consistent_read( - &self, - req: &String, - ) -> Result>>> { + pub async fn consistent_read(&self, req: &String) -> Result> { self.do_send_rpc_to_leader("api/consistent_read", Some(req)).await } @@ -87,7 +72,7 @@ impl ExampleClient { /// With a initialized cluster, new node can be added with [`write`]. /// Then setup replication with [`add_learner`]. /// Then make the new node a member with [`change_membership`]. - pub async fn init(&self) -> Result<(), RPCError>>> { + pub async fn init(&self) -> Result<(), typ::RPCError> { self.do_send_rpc_to_leader("cluster/init", Some(&Empty {})).await } @@ -97,10 +82,7 @@ impl ExampleClient { pub async fn add_learner( &self, req: (NodeId, String, String), - ) -> Result< - ClientWriteResponse, - RPCError>>, - > { + ) -> Result> { self.send_rpc_to_leader("cluster/add-learner", Some(&req)).await } @@ -111,10 +93,7 @@ impl ExampleClient { pub async fn change_membership( &self, req: &BTreeSet, - ) -> Result< - ClientWriteResponse, - RPCError>>, - > { + ) -> Result> { self.send_rpc_to_leader("cluster/change-membership", Some(req)).await } @@ -123,7 +102,7 @@ impl ExampleClient { /// Metrics contains various information about the cluster, such as current leader, /// membership config, replication status etc. /// See [`RaftMetrics`]. - pub async fn metrics(&self) -> Result, RPCError>> { + pub async fn metrics(&self) -> Result, typ::RPCError> { self.do_send_rpc_to_leader("cluster/metrics", None::<&()>).await } @@ -179,22 +158,17 @@ impl ExampleClient { /// /// If the target node is not a leader, a `ForwardToLeader` error will be /// returned and this client will retry at most 3 times to contact the updated leader. - async fn send_rpc_to_leader( - &self, - uri: &str, - req: Option<&Req>, - ) -> Result>> + async fn send_rpc_to_leader(&self, uri: &str, req: Option<&Req>) -> Result> where Req: Serialize + 'static, Resp: Serialize + DeserializeOwned, - Err: std::error::Error + Serialize + DeserializeOwned + TryAsRef> + Clone, + Err: std::error::Error + Serialize + DeserializeOwned + TryAsRef + Clone, { // Retry at most 3 times to find a valid leader. let mut n_retry = 3; loop { - let res: Result>> = - self.do_send_rpc_to_leader(uri, req).await; + let res: Result> = self.do_send_rpc_to_leader(uri, req).await; let rpc_err = match res { Ok(x) => return Ok(x), @@ -202,9 +176,9 @@ impl ExampleClient { }; if let RPCError::RemoteError(remote_err) = &rpc_err { - let raft_err: &RaftError = &remote_err.source; + let raft_err: &typ::RaftError<_> = &remote_err.source; - if let Some(ForwardToLeader { + if let Some(typ::ForwardToLeader { leader_id: Some(leader_id), leader_node: Some(leader_node), .. diff --git a/examples/raft-kv-rocksdb/src/lib.rs b/examples/raft-kv-rocksdb/src/lib.rs index 8ae664518..72349219f 100644 --- a/examples/raft-kv-rocksdb/src/lib.rs +++ b/examples/raft-kv-rocksdb/src/lib.rs @@ -8,7 +8,6 @@ use std::sync::Arc; use async_std::net::TcpListener; use async_std::task; -use openraft::storage::Adaptor; use openraft::Config; use openraft::TokioRuntime; @@ -16,9 +15,9 @@ use crate::app::App; use crate::network::api; use crate::network::management; use crate::network::Network; +use crate::store::new_storage; use crate::store::Request; use crate::store::Response; -use crate::store::Store; pub mod app; pub mod client; @@ -39,14 +38,39 @@ impl Display for Node { } } +pub type SnapshotData = Cursor>; + openraft::declare_raft_types!( - /// Declare the type configuration for example K/V store. - pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = Node, - Entry = openraft::Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime + pub TypeConfig: + D = Request, + R = Response, + NodeId = NodeId, + Node = Node, + Entry = openraft::Entry, + SnapshotData = SnapshotData, + AsyncRuntime = TokioRuntime ); -pub type LogStore = Adaptor>; -pub type StateMachineStore = Adaptor>; +pub mod typ { + use openraft::error::Infallible; + + use crate::Node; + use crate::NodeId; + use crate::TypeConfig; + + pub type Entry = openraft::Entry; + + pub type RaftError = openraft::error::RaftError; + pub type RPCError = openraft::error::RPCError>; + + pub type ClientWriteError = openraft::error::ClientWriteError; + pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; + pub type ForwardToLeader = openraft::error::ForwardToLeader; + pub type InitializeError = openraft::error::InitializeError; + + pub type ClientWriteResponse = openraft::raft::ClientWriteResponse; +} + pub type ExampleRaft = openraft::Raft; type Server = tide::Server>; @@ -69,24 +93,23 @@ where let config = Arc::new(config.validate().unwrap()); - // Create a instance of where the Raft data will be stored. - let store = Store::new(&dir).await; + let (log_store, state_machine_store) = new_storage(&dir).await; - let (log_store, state_machine) = Adaptor::new(store.clone()); + let kvs = state_machine_store.data.kvs.clone(); // Create the network layer that will connect and communicate the raft instances and // will be used in conjunction with the store created above. let network = Network {}; // Create a local raft instance. - let raft = openraft::Raft::new(node_id, config.clone(), network, log_store, state_machine).await.unwrap(); + let raft = openraft::Raft::new(node_id, config.clone(), network, log_store, state_machine_store).await.unwrap(); let app = Arc::new(App { id: node_id, api_addr: http_addr.clone(), rcp_addr: rcp_addr.clone(), raft, - store, + key_values: kvs, config, }); diff --git a/examples/raft-kv-rocksdb/src/network/api.rs b/examples/raft-kv-rocksdb/src/network/api.rs index 10bd5e6ab..53e3f6485 100644 --- a/examples/raft-kv-rocksdb/src/network/api.rs +++ b/examples/raft-kv-rocksdb/src/network/api.rs @@ -35,10 +35,10 @@ async fn write(mut req: Request>) -> tide::Result { async fn read(mut req: Request>) -> tide::Result { let key: String = req.body_json().await?; - let state_machine = req.state().store.state_machine.read().await; - let value = state_machine.get(&key)?; + let kvs = req.state().key_values.read().await; + let value = kvs.get(&key); - let res: Result = Ok(value.unwrap_or_default()); + let res: Result = Ok(value.cloned().unwrap_or_default()); Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&res)?).build()) } @@ -48,11 +48,11 @@ async fn consistent_read(mut req: Request>) -> tide::Result { match ret { Ok(_) => { let key: String = req.body_json().await?; - let state_machine = req.state().store.state_machine.read().await; + let kvs = req.state().key_values.read().await; - let value = state_machine.get(&key)?; + let value = kvs.get(&key); - let res: Result> = Ok(value.unwrap_or_default()); + let res: Result> = Ok(value.cloned().unwrap_or_default()); Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&res)?).build()) } e => Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&e)?).build()), diff --git a/examples/raft-kv-rocksdb/src/store.rs b/examples/raft-kv-rocksdb/src/store.rs index 970ae7441..9d75e1f14 100644 --- a/examples/raft-kv-rocksdb/src/store.rs +++ b/examples/raft-kv-rocksdb/src/store.rs @@ -1,5 +1,4 @@ use std::collections::BTreeMap; -use std::error::Error; use std::fmt::Debug; use std::io::Cursor; use std::ops::RangeBounds; @@ -11,7 +10,10 @@ use byteorder::BigEndian; use byteorder::ReadBytesExt; use byteorder::WriteBytesExt; use openraft::async_trait::async_trait; +use openraft::storage::LogFlushed; use openraft::storage::LogState; +use openraft::storage::RaftLogStorage; +use openraft::storage::RaftStateMachine; use openraft::storage::Snapshot; use openraft::AnyError; use openraft::Entry; @@ -19,10 +21,9 @@ use openraft::EntryPayload; use openraft::ErrorSubject; use openraft::ErrorVerb; use openraft::LogId; +use openraft::OptionalSend; use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; -use openraft::RaftStorage; -use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; @@ -36,8 +37,10 @@ use rocksdb::DB; use serde::Deserialize; use serde::Serialize; +use crate::typ; use crate::Node; use crate::NodeId; +use crate::SnapshotData; use crate::TypeConfig; /** @@ -64,7 +67,7 @@ pub struct Response { pub value: Option, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct StoredSnapshot { pub meta: SnapshotMeta, @@ -72,135 +75,210 @@ pub struct StoredSnapshot { pub data: Vec, } -/** - * Here defines a state machine of the raft, this state represents a copy of the data - * between each node. Note that we are using `serde` to serialize the `data`, which has - * a implementation to be serialized. Note that for this test we set both the key and - * value as String, but you could set any type of value that has the serialization impl. - */ -#[derive(Serialize, Deserialize, Debug, Default, Clone)] -pub struct SerializableExampleStateMachine { - pub last_applied_log: Option>, +#[derive(Debug, Clone)] +pub struct StateMachineStore { + pub data: StateMachineData, + + /// snapshot index is not persisted in this example. + /// + /// It is only used as a suffix of snapshot id, and should be globally unique. + /// In practice, using a timestamp in micro-second would be good enough. + snapshot_idx: u64, + + /// State machine stores snapshot in db. + db: Arc, +} + +#[derive(Debug, Clone)] +pub struct StateMachineData { + pub last_applied_log_id: Option>, pub last_membership: StoredMembership, - /// Application data. - pub data: BTreeMap, + /// State built from applying the raft logs + pub kvs: Arc>>, } -impl From<&StateMachine> for SerializableExampleStateMachine { - fn from(state: &StateMachine) -> Self { - let mut data = BTreeMap::new(); - for res in state.db.iterator_cf( - state.db.cf_handle("data").expect("cf_handle"), - rocksdb::IteratorMode::Start, - ) { - let (key, value) = res.unwrap(); - let key: &[u8] = &key; - let value: &[u8] = &value; - data.insert( - String::from_utf8(key.to_vec()).expect("invalid key"), - String::from_utf8(value.to_vec()).expect("invalid data"), - ); - } - Self { - last_applied_log: state.get_last_applied_log().expect("last_applied_log"), - last_membership: state.get_last_membership().expect("last_membership"), - data, - } +#[async_trait] +impl RaftSnapshotBuilder for StateMachineStore { + async fn build_snapshot(&mut self) -> Result, StorageError> { + let last_applied_log = self.data.last_applied_log_id.clone(); + let last_membership = self.data.last_membership.clone(); + + let kv_json = { + let kvs = self.data.kvs.read().await; + serde_json::to_vec(&*kvs).map_err(|e| StorageIOError::read_state_machine(&e))? + }; + + let snapshot_id = if let Some(last) = last_applied_log { + format!("{}-{}-{}", last.leader_id, last.index, self.snapshot_idx) + } else { + format!("--{}", self.snapshot_idx) + }; + + let meta = SnapshotMeta { + last_log_id: last_applied_log, + last_membership, + snapshot_id, + }; + + let snapshot = StoredSnapshot { + meta: meta.clone(), + data: kv_json.clone(), + }; + + self.set_current_snapshot_(snapshot)?; + + Ok(Snapshot { + meta, + snapshot: Box::new(Cursor::new(kv_json)), + }) } } -#[derive(Debug, Clone)] -pub struct StateMachine { - /// Application data. - pub db: Arc, -} +impl StateMachineStore { + async fn new(db: Arc) -> Result> { + let mut sm = Self { + data: StateMachineData { + last_applied_log_id: None, + last_membership: Default::default(), + kvs: Arc::new(Default::default()), + }, + snapshot_idx: 0, + db, + }; -fn sm_r_err(e: E) -> StorageError { - StorageIOError::read_state_machine(&e).into() -} -fn sm_w_err(e: E) -> StorageError { - StorageIOError::write(&e).into() -} + let snapshot = sm.get_current_snapshot_()?; + if let Some(snap) = snapshot { + sm.update_state_machine_(snap).await?; + } -impl StateMachine { - fn get_last_membership(&self) -> StorageResult> { - self.db - .get_cf( - self.db.cf_handle("state_machine").expect("cf_handle"), - "last_membership".as_bytes(), - ) - .map_err(sm_r_err) - .and_then(|value| { - value - .map(|v| serde_json::from_slice(&v).map_err(sm_r_err)) - .unwrap_or_else(|| Ok(StoredMembership::default())) - }) + Ok(sm) } - fn set_last_membership(&self, membership: StoredMembership) -> StorageResult<()> { - self.db - .put_cf( - self.db.cf_handle("state_machine").expect("cf_handle"), - "last_membership".as_bytes(), - serde_json::to_vec(&membership).map_err(sm_w_err)?, - ) - .map_err(sm_w_err) + + async fn update_state_machine_(&mut self, snapshot: StoredSnapshot) -> Result<(), StorageError> { + let kvs: BTreeMap = serde_json::from_slice(&snapshot.data) + .map_err(|e| StorageIOError::read_snapshot(Some(snapshot.meta.signature()), &e))?; + + self.data.last_applied_log_id = snapshot.meta.last_log_id.clone(); + self.data.last_membership = snapshot.meta.last_membership.clone(); + let mut x = self.data.kvs.write().await; + *x = kvs; + + Ok(()) } - fn get_last_applied_log(&self) -> StorageResult>> { - self.db - .get_cf( - self.db.cf_handle("state_machine").expect("cf_handle"), - "last_applied_log".as_bytes(), - ) - .map_err(sm_r_err) - .and_then(|value| value.map(|v| serde_json::from_slice(&v).map_err(sm_r_err)).transpose()) + + fn get_current_snapshot_(&self) -> StorageResult> { + Ok(self + .db + .get_cf(self.store(), b"snapshot") + .map_err(|e| StorageError::IO { + source: StorageIOError::read(&e), + })? + .and_then(|v| serde_json::from_slice(&v).ok())) } - fn set_last_applied_log(&self, log_id: LogId) -> StorageResult<()> { + + fn set_current_snapshot_(&self, snap: StoredSnapshot) -> StorageResult<()> { self.db - .put_cf( - self.db.cf_handle("state_machine").expect("cf_handle"), - "last_applied_log".as_bytes(), - serde_json::to_vec(&log_id).map_err(sm_w_err)?, - ) - .map_err(sm_w_err) + .put_cf(self.store(), b"snapshot", serde_json::to_vec(&snap).unwrap().as_slice()) + .map_err(|e| StorageError::IO { + source: StorageIOError::write_snapshot(Some(snap.meta.signature()), &e), + })?; + self.flush(ErrorSubject::Snapshot(Some(snap.meta.signature())), ErrorVerb::Write)?; + Ok(()) } - fn from_serializable(sm: SerializableExampleStateMachine, db: Arc) -> StorageResult { - for (key, value) in sm.data { - db.put_cf(db.cf_handle("data").unwrap(), key.as_bytes(), value.as_bytes()).map_err(sm_w_err)?; - } - let r = Self { db }; - if let Some(log_id) = sm.last_applied_log { - r.set_last_applied_log(log_id)?; + + fn flush(&self, subject: ErrorSubject, verb: ErrorVerb) -> Result<(), StorageIOError> { + self.db.flush_wal(true).map_err(|e| StorageIOError::new(subject, verb, AnyError::new(&e)))?; + Ok(()) + } + + fn store(&self) -> &ColumnFamily { + self.db.cf_handle("store").unwrap() + } +} + +#[async_trait] +impl RaftStateMachine for StateMachineStore { + type SnapshotBuilder = Self; + + async fn applied_state( + &mut self, + ) -> Result<(Option>, StoredMembership), StorageError> { + Ok((self.data.last_applied_log_id, self.data.last_membership.clone())) + } + + async fn apply(&mut self, entries: I) -> Result, StorageError> + where + I: IntoIterator + OptionalSend, + I::IntoIter: OptionalSend, + { + let entries = entries.into_iter(); + let mut replies = Vec::with_capacity(entries.size_hint().0); + + for ent in entries { + self.data.last_applied_log_id = Some(ent.log_id); + + let mut resp_value = None; + + match ent.payload { + EntryPayload::Blank => {} + EntryPayload::Normal(req) => match req { + Request::Set { key, value } => { + resp_value = Some(value.clone()); + + let mut st = self.data.kvs.write().await; + st.insert(key, value); + } + }, + EntryPayload::Membership(mem) => { + self.data.last_membership = StoredMembership::new(Some(ent.log_id), mem); + } + } + + replies.push(Response { value: resp_value }); } - r.set_last_membership(sm.last_membership)?; + Ok(replies) + } - Ok(r) + async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { + self.snapshot_idx += 1; + self.clone() } - fn new(db: Arc) -> StateMachine { - Self { db } + async fn begin_receiving_snapshot(&mut self) -> Result>>, StorageError> { + Ok(Box::new(Cursor::new(Vec::new()))) } - fn insert(&self, key: String, value: String) -> StorageResult<()> { - self.db - .put_cf(self.db.cf_handle("data").unwrap(), key.as_bytes(), value.as_bytes()) - .map_err(|e| StorageIOError::write(&e).into()) + + async fn install_snapshot( + &mut self, + meta: &SnapshotMeta, + snapshot: Box, + ) -> Result<(), StorageError> { + let new_snapshot = StoredSnapshot { + meta: meta.clone(), + data: snapshot.into_inner(), + }; + + self.update_state_machine_(new_snapshot.clone()).await?; + + self.set_current_snapshot_(new_snapshot)?; + + Ok(()) } - pub fn get(&self, key: &str) -> StorageResult> { - let key = key.as_bytes(); - self.db - .get_cf(self.db.cf_handle("data").unwrap(), key) - .map(|value| value.map(|value| String::from_utf8(value.to_vec()).expect("invalid data"))) - .map_err(|e| StorageIOError::read(&e).into()) + + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { + let x = self.get_current_snapshot_()?; + Ok(x.map(|s| Snapshot { + meta: s.meta.clone(), + snapshot: Box::new(Cursor::new(s.data.clone())), + })) } } -#[derive(Debug)] -pub struct Store { +#[derive(Debug, Clone)] +pub struct LogStore { db: Arc, - - /// The Raft state machine. - pub state_machine: RwLock, } type StorageResult = Result>; @@ -216,7 +294,7 @@ fn bin_to_id(buf: &[u8]) -> u64 { (&buf[0..8]).read_u64::().unwrap() } -impl Store { +impl LogStore { fn store(&self) -> &ColumnFamily { self.db.cf_handle("store").unwrap() } @@ -251,29 +329,6 @@ impl Store { Ok(()) } - fn get_snapshot_index_(&self) -> StorageResult { - Ok(self - .db - .get_cf(self.store(), b"snapshot_index") - .map_err(|e| StorageIOError::read(&e))? - .and_then(|v| serde_json::from_slice(&v).ok()) - .unwrap_or(0)) - } - - fn set_snapshot_indesx_(&self, snapshot_index: u64) -> StorageResult<()> { - self.db - .put_cf( - self.store(), - b"snapshot_index", - serde_json::to_vec(&snapshot_index).unwrap().as_slice(), - ) - .map_err(|e| StorageError::IO { - source: StorageIOError::write(&e), - })?; - self.flush(ErrorSubject::Store, ErrorVerb::Write)?; - Ok(()) - } - fn set_vote_(&self, vote: &Vote) -> StorageResult<()> { self.db .put_cf(self.store(), b"vote", serde_json::to_vec(vote).unwrap()) @@ -294,30 +349,10 @@ impl Store { })? .and_then(|v| serde_json::from_slice(&v).ok())) } - - fn get_current_snapshot_(&self) -> StorageResult> { - Ok(self - .db - .get_cf(self.store(), b"snapshot") - .map_err(|e| StorageError::IO { - source: StorageIOError::read(&e), - })? - .and_then(|v| serde_json::from_slice(&v).ok())) - } - - fn set_current_snapshot_(&self, snap: StoredSnapshot) -> StorageResult<()> { - self.db - .put_cf(self.store(), b"snapshot", serde_json::to_vec(&snap).unwrap().as_slice()) - .map_err(|e| StorageError::IO { - source: StorageIOError::write_snapshot(Some(snap.meta.signature()), &e), - })?; - self.flush(ErrorSubject::Snapshot(Some(snap.meta.signature())), ErrorVerb::Write)?; - Ok(()) - } } #[async_trait] -impl RaftLogReader for Arc { +impl RaftLogReader for LogStore { async fn try_get_log_entries + Clone + Debug + Send + Sync>( &mut self, range: RB, @@ -346,56 +381,8 @@ impl RaftLogReader for Arc { } #[async_trait] -impl RaftSnapshotBuilder for Arc { - #[tracing::instrument(level = "trace", skip(self))] - async fn build_snapshot(&mut self) -> Result, StorageError> { - let data; - let last_applied_log; - let last_membership; - - { - // Serialize the data of the state machine. - let state_machine = SerializableExampleStateMachine::from(&*self.state_machine.read().await); - data = serde_json::to_vec(&state_machine).map_err(|e| StorageIOError::read_state_machine(&e))?; - - last_applied_log = state_machine.last_applied_log; - last_membership = state_machine.last_membership; - } - - // TODO: we probably want this to be atomic. - let snapshot_idx: u64 = self.get_snapshot_index_()? + 1; - self.set_snapshot_indesx_(snapshot_idx)?; - - let snapshot_id = if let Some(last) = last_applied_log { - format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx) - } else { - format!("--{}", snapshot_idx) - }; - - let meta = SnapshotMeta { - last_log_id: last_applied_log, - last_membership, - snapshot_id, - }; - - let snapshot = StoredSnapshot { - meta: meta.clone(), - data: data.clone(), - }; - - self.set_current_snapshot_(snapshot)?; - - Ok(Snapshot { - meta, - snapshot: Box::new(Cursor::new(data)), - }) - } -} - -#[async_trait] -impl RaftStorage for Arc { +impl RaftLogStorage for LogStore { type LogReader = Self; - type SnapshotBuilder = Self; async fn get_log_state(&mut self) -> StorageResult> { let last = self.db.iterator_cf(self.logs(), rocksdb::IteratorMode::End).next().and_then(|res| { @@ -424,9 +411,12 @@ impl RaftStorage for Arc { self.get_vote_() } - #[tracing::instrument(level = "trace", skip(self, entries))] - async fn append_to_log(&mut self, entries: I) -> StorageResult<()> - where I: IntoIterator> + Send { + #[tracing::instrument(level = "trace", skip_all)] + async fn append(&mut self, entries: I, callback: LogFlushed) -> StorageResult<()> + where + I: IntoIterator> + Send, + I::IntoIter: Send, + { for entry in entries { let id = id_to_bin(entry.log_id.index); assert_eq!(bin_to_id(&id), entry.log_id.index); @@ -439,11 +429,13 @@ impl RaftStorage for Arc { .map_err(|e| StorageIOError::write_logs(&e))?; } + callback.log_io_completed(Ok(())); + Ok(()) } #[tracing::instrument(level = "debug", skip(self))] - async fn delete_conflict_logs_since(&mut self, log_id: LogId) -> StorageResult<()> { + async fn truncate(&mut self, log_id: LogId) -> StorageResult<()> { tracing::debug!("delete_log: [{:?}, +oo)", log_id); let from = id_to_bin(log_id.index); @@ -452,7 +444,7 @@ impl RaftStorage for Arc { } #[tracing::instrument(level = "debug", skip(self))] - async fn purge_logs_upto(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { tracing::debug!("delete_log: [0, {:?}]", log_id); self.set_last_purged_(log_id)?; @@ -461,124 +453,24 @@ impl RaftStorage for Arc { self.db.delete_range_cf(self.logs(), &from, &to).map_err(|e| StorageIOError::write_logs(&e).into()) } - async fn last_applied_state( - &mut self, - ) -> Result<(Option>, StoredMembership), StorageError> { - let state_machine = self.state_machine.read().await; - Ok(( - state_machine.get_last_applied_log()?, - state_machine.get_last_membership()?, - )) - } - - #[tracing::instrument(level = "trace", skip(self, entries))] - async fn apply_to_state_machine( - &mut self, - entries: &[Entry], - ) -> Result, StorageError> { - let mut res = Vec::with_capacity(entries.len()); - - let sm = self.state_machine.write().await; - - for entry in entries { - tracing::debug!(%entry.log_id, "replicate to sm"); - - sm.set_last_applied_log(entry.log_id)?; - - match entry.payload { - EntryPayload::Blank => res.push(Response { value: None }), - EntryPayload::Normal(ref req) => match req { - Request::Set { key, value } => { - sm.insert(key.clone(), value.clone())?; - res.push(Response { - value: Some(value.clone()), - }) - } - }, - EntryPayload::Membership(ref mem) => { - sm.set_last_membership(StoredMembership::new(Some(entry.log_id), mem.clone()))?; - - res.push(Response { value: None }) - } - }; - } - - self.flush(ErrorSubject::StateMachine, ErrorVerb::Write)?; - Ok(res) - } - - #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot( - &mut self, - ) -> Result::SnapshotData>, StorageError> { - Ok(Box::new(Cursor::new(Vec::new()))) + async fn get_log_reader(&mut self) -> Self::LogReader { + self.clone() } +} - #[tracing::instrument(level = "trace", skip(self, snapshot))] - async fn install_snapshot( - &mut self, - meta: &SnapshotMeta, - snapshot: Box<::SnapshotData>, - ) -> Result<(), StorageError> { - tracing::info!( - { snapshot_size = snapshot.get_ref().len() }, - "decoding snapshot for installation" - ); - - let new_snapshot = StoredSnapshot { - meta: meta.clone(), - data: snapshot.into_inner(), - }; - - // Update the state machine. - { - let updated_state_machine: SerializableExampleStateMachine = serde_json::from_slice(&new_snapshot.data) - .map_err(|e| StorageIOError::read_snapshot(Some(new_snapshot.meta.signature()), &e))?; - let mut state_machine = self.state_machine.write().await; - *state_machine = StateMachine::from_serializable(updated_state_machine, self.db.clone())?; - } +pub(crate) async fn new_storage>(db_path: P) -> (LogStore, StateMachineStore) { + let mut db_opts = Options::default(); + db_opts.create_missing_column_families(true); + db_opts.create_if_missing(true); - self.set_current_snapshot_(new_snapshot)?; - Ok(()) - } + let store = ColumnFamilyDescriptor::new("store", Options::default()); + let logs = ColumnFamilyDescriptor::new("logs", Options::default()); - #[tracing::instrument(level = "trace", skip(self))] - async fn get_current_snapshot(&mut self) -> Result>, StorageError> { - match Store::get_current_snapshot_(self)? { - Some(snapshot) => { - let data = snapshot.data.clone(); - Ok(Some(Snapshot { - meta: snapshot.meta, - snapshot: Box::new(Cursor::new(data)), - })) - } - None => Ok(None), - } - } + let db = DB::open_cf_descriptors(&db_opts, db_path, vec![store, logs]).unwrap(); + let db = Arc::new(db); - async fn get_log_reader(&mut self) -> Self::LogReader { - self.clone() - } + let log_store = LogStore { db: db.clone() }; + let sm_store = StateMachineStore::new(db).await.unwrap(); - async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { - self.clone() - } -} -impl Store { - pub(crate) async fn new>(db_path: P) -> Arc { - let mut db_opts = Options::default(); - db_opts.create_missing_column_families(true); - db_opts.create_if_missing(true); - - let store = ColumnFamilyDescriptor::new("store", Options::default()); - let state_machine = ColumnFamilyDescriptor::new("state_machine", Options::default()); - let data = ColumnFamilyDescriptor::new("data", Options::default()); - let logs = ColumnFamilyDescriptor::new("logs", Options::default()); - - let db = DB::open_cf_descriptors(&db_opts, db_path, vec![store, state_machine, data, logs]).unwrap(); - - let db = Arc::new(db); - let state_machine = RwLock::new(StateMachine::new(db.clone())); - Arc::new(Store { db, state_machine }) - } + (log_store, sm_store) }