Skip to content

Commit

Permalink
Change: Consolidate SnapshotMeta, type parameters into C
Browse files Browse the repository at this point in the history
Upgrade tip:

To adapt to this change, update type parameters with the single generic `C` constrained by `RaftTypeConfig`:

```rust,ignore
SnapshotMeta<NID, N> --> SnapshotMeta<C>
```
  • Loading branch information
drmingdrmer committed Mar 21, 2024
1 parent 135370a commit f440bff
Show file tree
Hide file tree
Showing 20 changed files with 45 additions and 55 deletions.
4 changes: 2 additions & 2 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ openraft::declare_raft_types!(

#[derive(Debug)]
pub struct StoredSnapshot {
pub meta: SnapshotMeta<NodeId, ()>,
pub meta: SnapshotMeta<TypeConfig>,
pub data: Vec<u8>,
}

Expand Down Expand Up @@ -285,7 +285,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, ()>,
meta: &SnapshotMeta<TypeConfig>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<NodeId>> {
let new_snapshot = StoredSnapshot {
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub mod typ {
pub type Raft = openraft::Raft<TypeConfig>;

pub type Vote = openraft::Vote<NodeId>;
pub type SnapshotMeta = openraft::SnapshotMeta<NodeId, BasicNode>;
pub type SnapshotMeta = openraft::SnapshotMeta<TypeConfig>;
pub type SnapshotData = <TypeConfig as openraft::RaftTypeConfig>::SnapshotData;
pub type Snapshot = openraft::Snapshot<TypeConfig>;

Expand Down
4 changes: 2 additions & 2 deletions examples/raft-kv-memstore-generic-snapshot-data/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct Response {

#[derive(Debug)]
pub struct StoredSnapshot {
pub meta: SnapshotMeta<NodeId, BasicNode>,
pub meta: SnapshotMeta<TypeConfig>,

/// The data of the state machine at the time of this snapshot.
pub data: Box<typ::SnapshotData>,
Expand Down Expand Up @@ -176,7 +176,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, BasicNode>,
meta: &SnapshotMeta<TypeConfig>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<NodeId>> {
tracing::info!("install snapshot");
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub mod typ {
pub type Raft = openraft::Raft<TypeConfig>;

pub type Vote = openraft::Vote<NodeId>;
pub type SnapshotMeta = openraft::SnapshotMeta<NodeId, BasicNode>;
pub type SnapshotMeta = openraft::SnapshotMeta<TypeConfig>;
pub type SnapshotData = <TypeConfig as openraft::RaftTypeConfig>::SnapshotData;
pub type Snapshot = openraft::Snapshot<TypeConfig>;

Expand Down
4 changes: 2 additions & 2 deletions examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct Response {

#[derive(Debug)]
pub struct StoredSnapshot {
pub meta: SnapshotMeta<NodeId, BasicNode>,
pub meta: SnapshotMeta<TypeConfig>,

/// The data of the state machine at the time of this snapshot.
pub data: Box<typ::SnapshotData>,
Expand Down Expand Up @@ -197,7 +197,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, BasicNode>,
meta: &SnapshotMeta<TypeConfig>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<NodeId>> {
tracing::info!("install snapshot");
Expand Down
4 changes: 2 additions & 2 deletions examples/raft-kv-memstore-singlethreaded/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub struct Response {

#[derive(Debug)]
pub struct StoredSnapshot {
pub meta: SnapshotMeta<NodeId, BasicNode>,
pub meta: SnapshotMeta<TypeConfig>,

/// The data of the state machine at the time of this snapshot.
pub data: Vec<u8>,
Expand Down Expand Up @@ -230,7 +230,7 @@ impl RaftStateMachine<TypeConfig> for Rc<StateMachineStore> {
#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, BasicNode>,
meta: &SnapshotMeta<TypeConfig>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<NodeId>> {
tracing::info!(
Expand Down
4 changes: 2 additions & 2 deletions examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct Response {

#[derive(Debug)]
pub struct StoredSnapshot {
pub meta: SnapshotMeta<NodeId, BasicNode>,
pub meta: SnapshotMeta<TypeConfig>,

/// The data of the state machine at the time of this snapshot.
pub data: Vec<u8>,
Expand Down Expand Up @@ -183,7 +183,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, BasicNode>,
meta: &SnapshotMeta<TypeConfig>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<NodeId>> {
tracing::info!(
Expand Down
4 changes: 2 additions & 2 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub struct Response {

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct StoredSnapshot {
pub meta: SnapshotMeta<NodeId, Node>,
pub meta: SnapshotMeta<TypeConfig>,

/// The data of the state machine at the time of this snapshot.
pub data: Vec<u8>,
Expand Down Expand Up @@ -249,7 +249,7 @@ impl RaftStateMachine<TypeConfig> for StateMachineStore {

async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<NodeId, Node>,
meta: &SnapshotMeta<TypeConfig>,
snapshot: Box<SnapshotData>,
) -> Result<(), StorageError<NodeId>> {
let new_snapshot = StoredSnapshot {
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/sm/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ pub(crate) enum Response<C>
where C: RaftTypeConfig
{
/// Build a snapshot, it returns result via the universal RaftCore response channel.
BuildSnapshot(SnapshotMeta<C::NodeId, C::Node>),
BuildSnapshot(SnapshotMeta<C>),

/// When finishing installing a snapshot.
///
/// It does not return any value to RaftCore.
InstallSnapshot(Option<SnapshotMeta<C::NodeId, C::Node>>),
InstallSnapshot(Option<SnapshotMeta<C>>),

/// Send back applied result to RaftCore.
Apply(ApplyResult<C>),
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ where C: RaftTypeConfig
/// - Engine only keeps the snapshot meta with the greatest last-log-id;
/// - and a snapshot smaller than last-committed is not allowed to be installed.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn finish_building_snapshot(&mut self, meta: SnapshotMeta<C::NodeId, C::Node>) {
pub(crate) fn finish_building_snapshot(&mut self, meta: SnapshotMeta<C>) {
tracing::info!(snapshot_meta = display(&meta), "{}", func_name!());

self.state.io_state_mut().set_building_snapshot(false);
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/handler/snapshot_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ where C: RaftTypeConfig
///
/// [`RaftStateMachine`]: crate::storage::RaftStateMachine
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_snapshot(&mut self, meta: SnapshotMeta<C::NodeId, C::Node>) -> bool {
pub(crate) fn update_snapshot(&mut self, meta: SnapshotMeta<C>) -> bool {
tracing::info!("update_snapshot: {:?}", meta);

if meta.last_log_id <= self.state.snapshot_last_log_id().copied() {
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/raft/message/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct InstallSnapshotRequest<C: RaftTypeConfig> {
pub vote: Vote<C::NodeId>,

/// Metadata of a snapshot: snapshot_id, last_log_ed membership etc.
pub meta: SnapshotMeta<C::NodeId, C::Node>,
pub meta: SnapshotMeta<C>,

/// The byte offset where this chunk of data is positioned in the snapshot file.
pub offset: u64,
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/raft_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ where C: RaftTypeConfig
pub membership_state: MembershipState<C::NodeId, C::Node>,

/// The metadata of the last snapshot.
pub snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,
pub snapshot_meta: SnapshotMeta<C>,

// --
// -- volatile fields: they are not persisted.
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/replication/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub(crate) struct SnapshotCallback<C: RaftTypeConfig> {
pub(crate) start_time: InstantOf<C>,

/// Meta data of the snapshot to be replicated.
pub(crate) snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,
pub(crate) snapshot_meta: SnapshotMeta<C>,

/// The result of the snapshot replication.
pub(crate) result: Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>>,
Expand All @@ -29,7 +29,7 @@ pub(crate) struct SnapshotCallback<C: RaftTypeConfig> {
impl<C: RaftTypeConfig> SnapshotCallback<C> {
pub(in crate::replication) fn new(
start_time: InstantOf<C>,
snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,
snapshot_meta: SnapshotMeta<C>,
result: Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>>,
) -> Self {
Self {
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/replication/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ where C: RaftTypeConfig
pub(crate) fn new_snapshot_callback(
request_id: RequestId,
start_time: InstantOf<C>,
snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,
snapshot_meta: SnapshotMeta<C>,
result: Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>>,
) -> Self {
Self::SnapshotCallback(DataWithId::new(
Expand Down
40 changes: 15 additions & 25 deletions openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ pub use v2::RaftLogStorageExt;
pub use v2::RaftStateMachine;

use crate::display_ext::DisplayOption;
use crate::node::Node;
use crate::raft_types::SnapshotId;
pub use crate::storage::callback::LogApplied;
pub use crate::storage::callback::LogFlushed;
use crate::LogId;
use crate::MessageSummary;
use crate::NodeId;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftTypeConfig;
Expand All @@ -34,27 +32,23 @@ use crate::StoredMembership;

#[derive(Debug, Clone, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct SnapshotMeta<NID, N>
where
NID: NodeId,
N: Node,
pub struct SnapshotMeta<C>
where C: RaftTypeConfig
{
/// Log entries upto which this snapshot includes, inclusive.
pub last_log_id: Option<LogId<NID>>,
pub last_log_id: Option<LogId<C::NodeId>>,

/// The last applied membership config.
pub last_membership: StoredMembership<NID, N>,
pub last_membership: StoredMembership<C::NodeId, C::Node>,

/// To identify a snapshot when transferring.
/// Caveat: even when two snapshot is built with the same `last_log_id`, they still could be
/// different in bytes.
pub snapshot_id: SnapshotId,
}

impl<NID, N> fmt::Display for SnapshotMeta<NID, N>
where
NID: NodeId,
N: Node,
impl<C> fmt::Display for SnapshotMeta<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
Expand All @@ -67,31 +61,27 @@ where
}
}

impl<NID, N> MessageSummary<SnapshotMeta<NID, N>> for SnapshotMeta<NID, N>
where
NID: NodeId,
N: Node,
impl<C> MessageSummary<SnapshotMeta<C>> for SnapshotMeta<C>
where C: RaftTypeConfig
{
fn summary(&self) -> String {
self.to_string()
}
}

impl<NID, N> SnapshotMeta<NID, N>
where
NID: NodeId,
N: Node,
impl<C> SnapshotMeta<C>
where C: RaftTypeConfig
{
pub fn signature(&self) -> SnapshotSignature<NID> {
pub fn signature(&self) -> SnapshotSignature<C::NodeId> {
SnapshotSignature {
last_log_id: self.last_log_id,
last_membership_log_id: *self.last_membership.log_id(),
snapshot_id: self.snapshot_id.clone(),
}
}

/// Returns a ref to the id of the last log that is included in this snasphot.
pub fn last_log_id(&self) -> Option<&LogId<NID>> {
/// Returns a ref to the id of the last log that is included in this snapshot.
pub fn last_log_id(&self) -> Option<&LogId<C::NodeId>> {
self.last_log_id.as_ref()
}
}
Expand All @@ -102,7 +92,7 @@ pub struct Snapshot<C>
where C: RaftTypeConfig
{
/// metadata of a snapshot
pub meta: SnapshotMeta<C::NodeId, C::Node>,
pub meta: SnapshotMeta<C>,

/// A read handle to the associated snapshot.
pub snapshot: Box<C::SnapshotData>,
Expand All @@ -112,7 +102,7 @@ impl<C> Snapshot<C>
where C: RaftTypeConfig
{
#[allow(dead_code)]
pub(crate) fn new(meta: SnapshotMeta<C::NodeId, C::Node>, snapshot: Box<C::SnapshotData>) -> Self {
pub(crate) fn new(meta: SnapshotMeta<C>, snapshot: Box<C::SnapshotData>) -> Self {
Self { meta, snapshot }
}
}
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/storage/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ where C: RaftTypeConfig
/// snapshot.
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<C::NodeId, C::Node>,
meta: &SnapshotMeta<C>,
snapshot: Box<C::SnapshotData>,
) -> Result<(), StorageError<C::NodeId>>;

Expand Down
4 changes: 2 additions & 2 deletions stores/memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ openraft::declare_raft_types!(
/// The application snapshot type which the `MemStore` works with.
#[derive(Debug)]
pub struct MemStoreSnapshot {
pub meta: SnapshotMeta<MemNodeId, ()>,
pub meta: SnapshotMeta<TypeConfig>,

/// The data of the state machine at the time of this snapshot.
pub data: Vec<u8>,
Expand Down Expand Up @@ -484,7 +484,7 @@ impl RaftStateMachine<TypeConfig> for Arc<MemStateMachine> {
#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<MemNodeId, ()>,
meta: &SnapshotMeta<TypeConfig>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<MemNodeId>> {
tracing::info!(
Expand Down
4 changes: 2 additions & 2 deletions stores/rocksstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub struct RocksResponse {

#[derive(Serialize, Deserialize, Debug)]
pub struct RocksSnapshot {
pub meta: SnapshotMeta<RocksNodeId, BasicNode>,
pub meta: SnapshotMeta<TypeConfig>,

/// The data of the state machine at the time of this snapshot.
pub data: Vec<u8>,
Expand Down Expand Up @@ -459,7 +459,7 @@ impl RaftStateMachine<TypeConfig> for RocksStateMachine {

async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<RocksNodeId, BasicNode>,
meta: &SnapshotMeta<TypeConfig>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<RocksNodeId>> {
tracing::info!(
Expand Down
4 changes: 2 additions & 2 deletions stores/sledstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub struct ExampleResponse {

#[derive(Serialize, Deserialize, Debug)]
pub struct ExampleSnapshot {
pub meta: SnapshotMeta<ExampleNodeId, BasicNode>,
pub meta: SnapshotMeta<TypeConfig>,

/// The data of the state machine at the time of this snapshot.
pub data: Vec<u8>,
Expand Down Expand Up @@ -641,7 +641,7 @@ impl RaftStateMachine<TypeConfig> for Arc<SledStore> {
#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<ExampleNodeId, BasicNode>,
meta: &SnapshotMeta<TypeConfig>,
snapshot: Box<SnapshotDataOf<TypeConfig>>,
) -> Result<(), StorageError<ExampleNodeId>> {
tracing::info!(
Expand Down

0 comments on commit f440bff

Please sign in to comment.