Skip to content

Commit

Permalink
moving all trait types under snapshotdata
Browse files Browse the repository at this point in the history
  • Loading branch information
zach-schoenberger committed Jul 23, 2023
1 parent 2562ff2 commit 50510a0
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 33 deletions.
13 changes: 8 additions & 5 deletions openraft/src/core/streaming_state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::raft::SnapshotChunk;
use crate::raft::SnapshotData;
use crate::raft::SnapshotManifest;
use crate::type_config::RTCSnapshotChunk;
use crate::type_config::RTCSnapshotData;
use crate::type_config::RTCSnapshotManifest;
use crate::ErrorSubject;
use crate::ErrorVerb;
use crate::RaftTypeConfig;
Expand All @@ -16,18 +19,18 @@ where C: RaftTypeConfig
pub(crate) snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,

/// A handle to the snapshot writer.
pub(crate) streaming_data: Box<C::SnapshotData>,
pub(crate) streaming_data: Box<RTCSnapshotData<C>>,

pub(crate) manifest: C::SnapshotManifest,
pub(crate) manifest: RTCSnapshotManifest<C>,
}

impl<C> Streaming<C>
where C: RaftTypeConfig
{
pub(crate) fn new(
snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,
manifest: C::SnapshotManifest,
streaming_data: Box<C::SnapshotData>,
manifest: RTCSnapshotManifest<C>,
streaming_data: Box<RTCSnapshotData<C>>,
) -> Self {
Self {
snapshot_meta,
Expand All @@ -37,7 +40,7 @@ where C: RaftTypeConfig
}

/// Receive a chunk of snapshot data. Returns true if it was a new chunk
pub(crate) async fn receive(&mut self, chunk: C::SnapshotChunk) -> Result<bool, StorageError<C::NodeId>> {
pub(crate) async fn receive(&mut self, chunk: RTCSnapshotChunk<C>) -> Result<bool, StorageError<C::NodeId>> {
let chunk_id = chunk.id();
let err_x = || {
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ fn make_req(offset: u64) -> InstallSnapshotRequest<UTConfig> {
InstallSnapshotRequest {
vote: Vote::new_committed(2, 1),
meta: make_meta(),
data: InstallSnapshotData::chunk(VecSnapshotChunk {
data: InstallSnapshotData::Chunk(VecSnapshotChunk {
chunk_id: VecChunkId {
offset: offset as usize,
len: 0,
Expand Down
3 changes: 0 additions & 3 deletions openraft/src/engine/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ impl RaftTypeConfig for UTConfig {
type Node = ();
type Entry = crate::Entry<UTConfig>;
type SnapshotData = VecSnapshot;
type SnapshotChunk = VecSnapshotChunk;
type SnapshotChunkId = VecChunkId;
type SnapshotManifest = VecManifest;
type AsyncRuntime = TokioRuntime;
}

Expand Down
2 changes: 1 addition & 1 deletion openraft/src/raft/message/client_write.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::Debug;

use crate::AppDataResponse;
#[cfg(feature = "serde")] use crate::AppDataResponse;
use crate::LogId;
use crate::Membership;
use crate::MessageSummary;
Expand Down
24 changes: 11 additions & 13 deletions openraft/src/raft/message/install_snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use std::fmt::Debug;
use std::fmt::Display;

use anyerror::AnyError;
use async_trait::async_trait;

use crate::type_config::RTCSnapshotChunk;
use crate::type_config::RTCSnapshotChunkId;
use crate::type_config::RTCSnapshotManifest;
use crate::MessageSummary;
use crate::NodeId;
use crate::OptionalSerde;
Expand All @@ -27,20 +33,12 @@ pub struct InstallSnapshotRequest<C: RaftTypeConfig> {
#[derive(PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub enum InstallSnapshotData<C: RaftTypeConfig> {
Manifest(C::SnapshotManifest),
Chunk(C::SnapshotChunk),
Manifest(RTCSnapshotManifest<C>),
Chunk(RTCSnapshotChunk<C>),
}

impl<C: RaftTypeConfig> InstallSnapshotData<C> {
pub fn chunk(data: C::SnapshotChunk) -> Self {
Self::Chunk(data)
}

pub fn manifest(manifest: C::SnapshotManifest) -> Self {
Self::Manifest(manifest)
}

pub fn chunk_id(&self) -> Option<<C::SnapshotChunk as SnapshotChunk>::ChunkId> {
pub fn chunk_id(&self) -> Option<RTCSnapshotChunkId<C>> {
match self {
Self::Manifest(_) => None,
Self::Chunk(c) => Some(c.id()),
Expand Down Expand Up @@ -74,9 +72,9 @@ pub trait SnapshotManifest: Clone + Send + Sync + Default + PartialEq + Optional

#[async_trait]
pub trait SnapshotData: Send + Sync {
type ChunkId: Eq + PartialEq + Send + Sync + Display + Debug + OptionalSerde + 'static;
type Chunk: SnapshotChunk<ChunkId = Self::ChunkId>;
type ChunkId;
type Manifest: SnapshotManifest;
type Manifest: SnapshotManifest<ChunkId = Self::ChunkId>;

// Generate the manifest for this snapshot. The manifest should be able to keep track of all
// the chunks to send or receive
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::raft::SnapshotManifest;
use crate::storage::RaftLogReader;
use crate::storage::RaftLogStorage;
use crate::storage::Snapshot;
use crate::type_config::RTCSnapshotManifest;
use crate::utime::UTime;
use crate::AsyncRuntime;
use crate::ErrorSubject;
Expand Down Expand Up @@ -640,7 +641,7 @@ where

let err_x = || (ErrorSubject::Snapshot(Some(snapshot.meta.signature())), ErrorVerb::Read);

let mut manifest: C::SnapshotManifest = snapshot.snapshot.manifest().await;
let mut manifest: RTCSnapshotManifest<C> = snapshot.snapshot.manifest().await;
let leader_time = <C::AsyncRuntime as AsyncRuntime>::Instant::now();
let snap_timeout = self.config.send_snapshot_timeout();

Expand Down
19 changes: 10 additions & 9 deletions openraft/src/type_config.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
use std::fmt::Debug;
use std::fmt::Display;

use crate::entry::FromAppData;
use crate::entry::RaftEntry;
use crate::raft::SnapshotChunk;
use crate::raft::SnapshotData;
use crate::raft::SnapshotManifest;
use crate::AppData;
use crate::AppDataResponse;
use crate::AsyncRuntime;
use crate::Node;
use crate::NodeId;
use crate::OptionalSerde;

/// Configuration of types used by the [`Raft`] core engine.
///
Expand Down Expand Up @@ -57,18 +53,23 @@ pub trait RaftTypeConfig:
/// Raft log entry, which can be built from an AppData.
type Entry: RaftEntry<Self::NodeId, Self::Node> + FromAppData<Self::D>;

type SnapshotChunkId: Eq + PartialEq + Send + Sync + Display + Debug + OptionalSerde + 'static;
// type SnapshotChunkId: Eq + PartialEq + Send + Sync + Display + Debug + OptionalSerde + 'static;

type SnapshotChunk: SnapshotChunk<ChunkId = Self::SnapshotChunkId> + Debug + 'static;
// type SnapshotChunk: SnapshotChunk<ChunkId = Self::SnapshotChunkId> + Debug + 'static;

type SnapshotManifest: SnapshotManifest<ChunkId = <Self::SnapshotChunk as SnapshotChunk>::ChunkId> + 'static;
// type SnapshotManifest: SnapshotManifest<ChunkId = <Self::SnapshotChunk as
// SnapshotChunk>::ChunkId> + 'static;
/// Snapshot data for exposing a snapshot for reading & writing.
///
/// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#implement-raftstorage)
/// for details on where and how this is used.
type SnapshotData: SnapshotData<ChunkId = Self::SnapshotChunkId, Manifest = Self::SnapshotManifest, Chunk = Self::SnapshotChunk>
+ 'static;
type SnapshotData: SnapshotData + 'static;

/// Asynchronous runtime type.
type AsyncRuntime: AsyncRuntime;
}

pub type RTCSnapshotData<C> = <C as RaftTypeConfig>::SnapshotData;
pub type RTCSnapshotChunkId<C> = <<C as RaftTypeConfig>::SnapshotData as SnapshotData>::ChunkId;
pub type RTCSnapshotChunk<C> = <<C as RaftTypeConfig>::SnapshotData as SnapshotData>::Chunk;
pub type RTCSnapshotManifest<C> = <<C as RaftTypeConfig>::SnapshotData as SnapshotData>::Manifest;

0 comments on commit 50510a0

Please sign in to comment.