Skip to content

Commit

Permalink
almost done updating tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zach-schoenberger committed Oct 15, 2023
1 parent 50510a0 commit c53f0ab
Show file tree
Hide file tree
Showing 15 changed files with 211 additions and 164 deletions.
9 changes: 5 additions & 4 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::sync::Arc;
use std::sync::Mutex;

use openraft::async_trait::async_trait;
use openraft::raft::ExampleSnapshot;
use openraft::storage::LogState;
use openraft::storage::RaftLogReader;
use openraft::storage::RaftSnapshotBuilder;
Expand Down Expand Up @@ -75,7 +76,7 @@ pub type MemNodeId = u64;
openraft::declare_raft_types!(
/// Declare the type configuration for `MemStore`.
pub TypeConfig: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId, Node = (),
Entry = Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>, AsyncRuntime = TokioRuntime
Entry = Entry<TypeConfig>, SnapshotData = ExampleSnapshot, AsyncRuntime = TokioRuntime
);

/// The application snapshot type which the `MemStore` works with.
Expand Down Expand Up @@ -302,7 +303,7 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<MemStore> {

Ok(Snapshot {
meta,
snapshot: Box::new(Cursor::new(data)),
snapshot: Box::new(Cursor::new(data).into()),
})
}
}
Expand Down Expand Up @@ -424,7 +425,7 @@ impl RaftStorage<TypeConfig> for Arc<MemStore> {
async fn begin_receiving_snapshot(
&mut self,
) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<MemNodeId>> {
Ok(Box::new(Cursor::new(Vec::new())))
Ok(Box::new(Cursor::new(Vec::new()).into()))
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
Expand Down Expand Up @@ -471,7 +472,7 @@ impl RaftStorage<TypeConfig> for Arc<MemStore> {
let data = snapshot.data.clone();
Ok(Some(Snapshot {
meta: snapshot.meta.clone(),
snapshot: Box::new(Cursor::new(data)),
snapshot: Box::new(Cursor::new(data).into()),
}))
}
None => Ok(None),
Expand Down
5 changes: 2 additions & 3 deletions openraft/src/compat/compat07.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,12 +414,11 @@ pub mod testing {

#[cfg(test)]
mod tests {
use std::io::Cursor;

use maplit::btreemap;
use maplit::btreeset;

use crate::compat::Upgrade;
use crate::raft::ExampleSnapshot;
use crate::CommittedLeaderId;
use crate::TokioRuntime;

Expand Down Expand Up @@ -512,7 +511,7 @@ mod tests {
crate::declare_raft_types!(
pub TestingConfig:
D = u64, R = u64, NodeId = u64, Node = crate::EmptyNode,
Entry = crate::Entry<TestingConfig>, SnapshotData = Cursor<Vec<u8>>,
Entry = crate::Entry<TestingConfig>, SnapshotData = ExampleSnapshot,
AsyncRuntime = TokioRuntime
);

Expand Down
8 changes: 4 additions & 4 deletions openraft/src/core/snapshot_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use crate::SnapshotId;
/// A global unique id of install-snapshot request.
#[derive(Debug, Clone)]
#[derive(PartialEq, Eq)]
pub(crate) struct SnapshotRequestId<NID: NodeId, C> {
pub(crate) struct SnapshotRequestId<NID: NodeId, ChunkId> {
pub(crate) leader_id: LeaderId<NID>,
pub(crate) snapshot_id: SnapshotId,
pub(crate) chunk_id: Option<C>,
pub(crate) chunk_id: Option<ChunkId>,
}

impl<NID: NodeId, C> SnapshotRequestId<NID, C> {
pub(crate) fn new(leader_id: LeaderId<NID>, snapshot_id: SnapshotId, chunk_id: Option<C>) -> Self {
impl<NID: NodeId, ChunkId> SnapshotRequestId<NID, ChunkId> {
pub(crate) fn new(leader_id: LeaderId<NID>, snapshot_id: SnapshotId, chunk_id: Option<ChunkId>) -> Self {
Self {
leader_id,
snapshot_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use pretty_assertions::assert_eq;

use crate::core::sm;
use crate::engine::testing::UTConfig;
use crate::engine::testing::VecChunkId;
use crate::engine::testing::VecManifest;
use crate::engine::testing::VecSnapshotChunk;
use crate::engine::Command;
use crate::engine::Engine;
use crate::error::InstallSnapshotError;
use crate::error::SnapshotMismatch;
use crate::raft::ExampleChunkId;
use crate::raft::ExampleManifest;
use crate::raft::ExampleSnapshotChunk;
use crate::raft::InstallSnapshotData;
use crate::raft::InstallSnapshotRequest;
use crate::raft_state::StreamingState;
Expand Down Expand Up @@ -47,8 +47,8 @@ fn make_req(offset: u64) -> InstallSnapshotRequest<UTConfig> {
InstallSnapshotRequest {
vote: Vote::new_committed(2, 1),
meta: make_meta(),
data: InstallSnapshotData::Chunk(VecSnapshotChunk {
chunk_id: VecChunkId {
data: InstallSnapshotData::Chunk(ExampleSnapshotChunk {
chunk_id: ExampleChunkId {
offset: offset as usize,
len: 0,
},
Expand All @@ -61,7 +61,7 @@ fn make_manifest() -> InstallSnapshotRequest<UTConfig> {
InstallSnapshotRequest {
vote: Vote::new_committed(2, 1),
meta: make_meta(),
data: InstallSnapshotData::Manifest(VecManifest { chunks: btreeset! {} }),
data: InstallSnapshotData::Manifest(ExampleManifest { chunks: btreeset! {} }),
}
}

Expand Down
113 changes: 2 additions & 111 deletions openraft/src/engine/testing.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
use std::collections::BTreeSet;

use anyerror::AnyError;
use async_trait::async_trait;
use derive_more::Display;

use crate::raft::SnapshotChunk;
use crate::raft::SnapshotData;
use crate::raft::SnapshotManifest;
use crate::raft::ExampleSnapshot;
use crate::RaftTypeConfig;
use crate::TokioRuntime;

Expand All @@ -20,107 +12,6 @@ impl RaftTypeConfig for UTConfig {
type NodeId = u64;
type Node = ();
type Entry = crate::Entry<UTConfig>;
type SnapshotData = VecSnapshot;
type SnapshotData = ExampleSnapshot;
type AsyncRuntime = TokioRuntime;
}

#[derive(Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub(crate) struct VecManifest {
pub chunks: BTreeSet<VecChunkId>,
}

#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Display)]
#[display(fmt = "(offset: {})", offset)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub(crate) struct VecChunkId {
pub offset: usize,
pub len: usize,
}

#[derive(Clone)]
pub(crate) struct VecSnapshot {
pub len: usize,
pub data: Vec<u8>,
}

#[derive(Clone, PartialEq, Eq, Debug, Display)]
#[display(fmt = "{}", chunk_id)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub(crate) struct VecSnapshotChunk {
pub chunk_id: VecChunkId,
pub data: Vec<u8>,
}

impl SnapshotChunk for VecSnapshotChunk {
type ChunkId = VecChunkId;

fn id(&self) -> Self::ChunkId {
self.chunk_id.clone()
}
}

impl SnapshotManifest for VecManifest {
type Iter = std::collections::btree_set::IntoIter<VecChunkId>;
type ChunkId = VecChunkId;

fn chunks_to_send(&self) -> Self::Iter {
self.chunks.clone().into_iter()
}

fn receive(&mut self, c: &Self::ChunkId) -> Result<bool, AnyError> {
Ok(self.chunks.remove(c))
}

fn is_complete(&self) -> bool {
self.chunks.is_empty()
}
}

#[async_trait]
impl SnapshotData for VecSnapshot {
type Chunk = VecSnapshotChunk;
type ChunkId = VecChunkId;
type Manifest = VecManifest;

async fn manifest(&self) -> Self::Manifest {
let chunks: BTreeSet<_> = self
.data
.as_slice()
.chunks(self.len)
.enumerate()
.map(|(i, c)| VecChunkId {
offset: i * self.len,
len: c.len(),
})
.collect();

VecManifest { chunks }
}

async fn get_chunk(&self, id: &Self::ChunkId) -> Result<Self::Chunk, AnyError> {
Ok(VecSnapshotChunk {
chunk_id: id.clone(),
data: self.data[id.offset..(id.offset + id.len)].to_vec(),
})
}

async fn receive(&mut self, c: Self::Chunk) -> Result<(), AnyError> {
if self.data.len() < (c.chunk_id.offset + c.chunk_id.len) {
self.data.reserve((c.chunk_id.offset + c.chunk_id.len) - self.data.len());
}

let _: Vec<_> = self.data.splice(c.chunk_id.offset..(c.chunk_id.offset + c.chunk_id.len), c.data).collect();

Ok(())
}
}

impl<'a> From<&'a [u8]> for VecSnapshot {
fn from(value: &'a [u8]) -> Self {
Self {
len: 1024,
data: value.to_vec(),
}
}
}
123 changes: 123 additions & 0 deletions openraft/src/raft/message/install_snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::collections::BTreeSet;
use std::fmt::Debug;
use std::fmt::Display;
use std::io::Cursor;

use anyerror::AnyError;
use async_trait::async_trait;
use derive_more::Display;

use crate::type_config::RTCSnapshotChunk;
use crate::type_config::RTCSnapshotChunkId;
Expand Down Expand Up @@ -108,3 +111,123 @@ impl<C: RaftTypeConfig> MessageSummary<InstallSnapshotRequest<C>> for InstallSna
pub struct InstallSnapshotResponse<NID: NodeId> {
pub vote: Vote<NID>,
}

#[derive(Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct ExampleManifest {
pub chunks: BTreeSet<ExampleChunkId>,
}

#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Display)]
#[display(fmt = "(offset: {})", offset)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct ExampleChunkId {
pub offset: usize,
pub len: usize,
}

#[derive(Clone)]
pub struct ExampleSnapshot {
pub chunk_len: usize,
pub data: Vec<u8>,
}

impl ExampleSnapshot {
pub fn into_inner(self) -> Vec<u8> {
self.data
}

pub fn get_ref(&self) -> &[u8] {
&self.data
}
}

#[derive(Clone, PartialEq, Eq, Debug, Display)]
#[display(fmt = "{}", chunk_id)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct ExampleSnapshotChunk {
pub chunk_id: ExampleChunkId,
pub data: Vec<u8>,
}

impl SnapshotChunk for ExampleSnapshotChunk {
type ChunkId = ExampleChunkId;

fn id(&self) -> Self::ChunkId {
self.chunk_id.clone()
}
}

impl SnapshotManifest for ExampleManifest {
type Iter = std::collections::btree_set::IntoIter<ExampleChunkId>;
type ChunkId = ExampleChunkId;

fn chunks_to_send(&self) -> Self::Iter {
self.chunks.clone().into_iter()
}

fn receive(&mut self, c: &Self::ChunkId) -> Result<bool, AnyError> {
Ok(self.chunks.remove(c))
}

fn is_complete(&self) -> bool {
self.chunks.is_empty()
}
}

#[async_trait]
impl SnapshotData for ExampleSnapshot {
type Chunk = ExampleSnapshotChunk;
type ChunkId = ExampleChunkId;
type Manifest = ExampleManifest;

async fn manifest(&self) -> Self::Manifest {
let chunks: BTreeSet<_> = self
.data
.as_slice()
.chunks(self.chunk_len)
.enumerate()
.map(|(i, c)| ExampleChunkId {
offset: i * self.chunk_len,
len: c.len(),
})
.collect();

ExampleManifest { chunks }
}

async fn get_chunk(&self, id: &Self::ChunkId) -> Result<Self::Chunk, AnyError> {
Ok(ExampleSnapshotChunk {
chunk_id: id.clone(),
data: self.data[id.offset..(id.offset + id.len)].to_vec(),
})
}

async fn receive(&mut self, c: Self::Chunk) -> Result<(), AnyError> {
if self.data.len() < (c.chunk_id.offset + c.chunk_id.len) {
self.data.extend_from_slice(&vec![0; (c.chunk_id.offset + c.chunk_id.len) - self.data.len()]);
}

let _: Vec<_> = self.data.splice(c.chunk_id.offset..(c.chunk_id.offset + c.chunk_id.len), c.data).collect();

Ok(())
}
}

impl<'a> From<&'a [u8]> for ExampleSnapshot {
fn from(value: &'a [u8]) -> Self {
Self {
chunk_len: 1024,
data: value.to_vec(),
}
}
}

impl From<Cursor<Vec<u8>>> for ExampleSnapshot {
fn from(value: Cursor<Vec<u8>>) -> Self {
Self {
chunk_len: 1024,
data: value.into_inner(),
}
}
}
4 changes: 4 additions & 0 deletions openraft/src/raft/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ mod client_write;
pub use append_entries::AppendEntriesRequest;
pub use append_entries::AppendEntriesResponse;
pub use client_write::ClientWriteResponse;
pub use install_snapshot::ExampleChunkId;
pub use install_snapshot::ExampleManifest;
pub use install_snapshot::ExampleSnapshot;
pub use install_snapshot::ExampleSnapshotChunk;
pub use install_snapshot::InstallSnapshotData;
pub use install_snapshot::InstallSnapshotRequest;
pub use install_snapshot::InstallSnapshotResponse;
Expand Down
Loading

0 comments on commit c53f0ab

Please sign in to comment.