-
Notifications
You must be signed in to change notification settings - Fork 161
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor: Add gRPC network kv-memstore example (#1274)
* CI: add raft-kv-memstore-grpc in ci * CI: add install protoc in steps examples
- Loading branch information
1 parent
f0d240c
commit a83ea29
Showing
19 changed files
with
1,483 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
target | ||
vendor | ||
.idea | ||
|
||
/*.log |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
[package] | ||
name = "raft-kv-memstore-grpc" | ||
version = "0.1.0" | ||
readme = "README.md" | ||
|
||
edition = "2021" | ||
authors = [ | ||
"Sainath Singineedi <[email protected]>", | ||
] | ||
categories = ["algorithms", "asynchronous", "data-structures"] | ||
description = "An example distributed key-value store built upon `openraft`." | ||
homepage = "https://github.com/databendlabs/openraft" | ||
keywords = ["raft", "consensus"] | ||
license = "MIT OR Apache-2.0" | ||
repository = "https://github.com/databendlabs/openraft" | ||
|
||
[[bin]] | ||
name = "raft-key-value" | ||
path = "src/bin/main.rs" | ||
|
||
[dependencies] | ||
memstore = { path = "../memstore", features = [] } | ||
openraft = { path = "../../openraft", features = ["serde", "type-alias"] } | ||
|
||
clap = { version = "4.1.11", features = ["derive", "env"] } | ||
serde = { version = "1.0.114", features = ["derive"] } | ||
serde_json = "1.0.57" | ||
tokio = { version = "1.0", default-features = false, features = ["sync"] } | ||
tracing = "0.1.29" | ||
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } | ||
tonic = "0.12.3" | ||
tonic-build = "0.12.3" | ||
bincode = "1.3.3" | ||
dashmap = "6.1.0" | ||
prost = "0.13.4" | ||
futures = "0.3.31" | ||
|
||
[features] | ||
|
||
[build-dependencies] | ||
prost-build = "0.13.4" | ||
tonic-build = "0.12.3" | ||
|
||
[package.metadata.docs.rs] | ||
all-features = true |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
# Distributed Key-Value Store with OpenRaft and gRPC | ||
|
||
A distributed key-value store built using `openraft` and gRPC, demonstrating a robust, replicated storage system. | ||
|
||
## Modules | ||
|
||
The application is structured into key modules: | ||
|
||
- `src/bin`: Contains the `main()` function for server setup in [main.rs](./src/bin/main.rs) | ||
- `src/network`: For routing calls to their respective grpc RPCs | ||
- `src/grpc`: | ||
- `api_service.rs`: gRPC service implementations for key value store(application APIs) | ||
- `internal_service.rs`: Raft-specific gRPC internal network communication | ||
- `management_service.rs`: Administrative gRPC endpoints for cluster management | ||
- `protos`: Protocol buffers specifications for above services | ||
- `src/store`: Implements the key-value store logic in [store/mod.rs](./src/store/mod.rs) | ||
|
||
## Running the Cluster | ||
|
||
### Build the Application | ||
|
||
```shell | ||
cargo build | ||
``` | ||
|
||
### Start Nodes | ||
|
||
Start the first node: | ||
```shell | ||
./raft-key-value --id 1 --addr 127.0.0.1:21001 | ||
``` | ||
|
||
Start additional nodes by changing the `id` and `grpc-addr`: | ||
```shell | ||
./raft-key-value --id 2 --addr 127.0.0.1:21002 | ||
``` | ||
|
||
### Cluster Setup | ||
|
||
1. Initialize the first node as the leader | ||
2. Add learner nodes | ||
3. Change membership to include all nodes | ||
4. Write and read data using gRPC calls | ||
|
||
## Data Storage | ||
|
||
Data is stored in state machines, with Raft ensuring data synchronization across all nodes. | ||
See the [ExampleStateMachine](./src/store/mod.rs) for implementation details. | ||
|
||
## Cluster Management | ||
|
||
Node management process: | ||
- Store node information in the storage layer | ||
- Add nodes as learners | ||
- Promote learners to full cluster members | ||
|
||
Note: This is an example implementation and not recommended for production use. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
println!("cargo:rerun-if-changed=src/*"); | ||
let mut config = prost_build::Config::new(); | ||
config.protoc_arg("--experimental_allow_proto3_optional"); | ||
let proto_files = [ | ||
"proto/internal_service.proto", | ||
"proto/management_service.proto", | ||
"proto/api_service.proto", | ||
]; | ||
tonic_build::configure() | ||
.type_attribute("openraftpb.Node", "#[derive(Eq, serde::Serialize, serde::Deserialize)]") | ||
.type_attribute( | ||
"openraftpb.SetRequest", | ||
"#[derive(Eq, serde::Serialize, serde::Deserialize)]", | ||
) | ||
.type_attribute( | ||
"openraftpb.Response", | ||
"#[derive(Eq, serde::Serialize, serde::Deserialize)]", | ||
) | ||
.compile_protos_with_config(config, &proto_files, &["proto"])?; | ||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
syntax = "proto3"; | ||
package openraftpb; | ||
|
||
// ApiService provides the key-value store API operations | ||
service ApiService { | ||
// Get retrieves the value associated with a given key | ||
rpc Get(GetRequest) returns (Response) {} | ||
|
||
// Set stores a key-value pair in the distributed store | ||
rpc Set(SetRequest) returns (Response) {} | ||
} | ||
|
||
// GetRequest represents a key lookup request | ||
message GetRequest { | ||
string key = 1; // Key to look up | ||
} | ||
|
||
// GetResponse contains the value associated with the requested key | ||
message Response { | ||
optional string value = 1; // Retrieved value | ||
} | ||
|
||
// SetRequest represents a key-value pair to be stored | ||
message SetRequest { | ||
string key = 1; // Key to store | ||
string value = 2; // Value to associate with the key | ||
} |
75 changes: 75 additions & 0 deletions
75
examples/raft-kv-memstore-grpc/proto/internal_service.proto
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
syntax = "proto3"; | ||
package openraftpb; | ||
|
||
// LeaderId represents the leader identifier in Raft | ||
message LeaderId { | ||
uint64 term = 1; | ||
uint64 node_id = 2; | ||
} | ||
|
||
// Vote represents the voting information in Raft leader election | ||
message Vote { | ||
LeaderId leader_id = 1; | ||
bool committed = 2; | ||
} | ||
|
||
// LogId represents the log identifier in Raft | ||
message LogId { | ||
uint64 index = 1; | ||
LeaderId leader_id = 2; | ||
} | ||
|
||
// VoteRequest represents a request for votes during leader election | ||
message VoteRequest { | ||
Vote vote = 1; | ||
LogId last_log_id = 2; | ||
} | ||
|
||
// VoteResponse represents the response to a vote request | ||
message VoteResponse { | ||
Vote vote = 1; | ||
bool vote_granted = 2; | ||
LogId last_log_id = 3; | ||
} | ||
|
||
// InternalService handles internal Raft cluster communication | ||
service InternalService { | ||
// Vote handles vote requests between Raft nodes during leader election | ||
rpc Vote(VoteRequest) returns (VoteResponse) {} | ||
|
||
// AppendEntries handles call related to append entries RPC | ||
rpc AppendEntries(RaftRequestBytes) returns (RaftReplyBytes) {} | ||
|
||
// Snapshot handles install snapshot RPC | ||
rpc Snapshot(stream SnapshotRequest) returns (RaftReplyBytes) {} | ||
} | ||
|
||
// RaftRequestBytes encapsulates binary Raft request data | ||
message RaftRequestBytes { | ||
bytes value = 1; // Serialized Raft request data | ||
} | ||
|
||
// RaftReplyBytes encapsulates binary Raft response data | ||
message RaftReplyBytes { | ||
bytes value = 1; // Serialized Raft response data | ||
} | ||
|
||
// The item of snapshot chunk stream. | ||
// | ||
// The first item contains `rpc_meta`, | ||
// including the application defined format of this snapshot data, | ||
// the leader vote and snapshot-meta. | ||
// | ||
// Since the second item, the `rpc_meta` should be empty and will be ignored by | ||
// the receiving end. | ||
message SnapshotRequest { | ||
|
||
// bytes serialized meta data, including vote and snapshot_meta. | ||
// ```text | ||
// (SnapshotFormat, Vote, SnapshotMeta) | ||
// ``` | ||
bytes rpc_meta = 1; | ||
|
||
// Snapshot data chunk | ||
bytes chunk = 2; | ||
} |
50 changes: 50 additions & 0 deletions
50
examples/raft-kv-memstore-grpc/proto/management_service.proto
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
syntax = "proto3"; | ||
package openraftpb; | ||
|
||
// ManagementService handles Raft cluster management operations | ||
service ManagementService { | ||
// Init initializes a new Raft cluster with the given nodes | ||
rpc Init(InitRequest) returns (RaftReplyString) {} | ||
|
||
// AddLearner adds a new learner node to the Raft cluster | ||
rpc AddLearner(AddLearnerRequest) returns (RaftReplyString) {} | ||
|
||
// ChangeMembership modifies the cluster membership configuration | ||
rpc ChangeMembership(ChangeMembershipRequest) returns (RaftReplyString) {} | ||
|
||
// Metrics retrieves cluster metrics and status information | ||
rpc Metrics(RaftRequestString) returns (RaftReplyString) {} | ||
} | ||
|
||
// InitRequest contains the initial set of nodes for cluster initialization | ||
message InitRequest { | ||
repeated Node nodes = 1; // List of initial cluster nodes | ||
} | ||
|
||
// Node represents a single node in the Raft cluster | ||
message Node { | ||
string rpc_addr = 1; // RPC address for node communication | ||
uint64 node_id = 2; // Unique identifier for the node | ||
} | ||
|
||
// AddLearnerRequest specifies parameters for adding a learner node | ||
message AddLearnerRequest { | ||
Node node = 1; // Node to be added as a learner | ||
} | ||
|
||
// RaftRequestString represents a string-based Raft request | ||
message RaftRequestString { | ||
string data = 1; // Request data in string format | ||
} | ||
|
||
// RaftReplyString represents a string-based Raft response | ||
message RaftReplyString { | ||
string data = 1; // Response data | ||
string error = 2; // Error message, if any | ||
} | ||
|
||
// ChangeMembershipRequest specifies parameters for modifying cluster membership | ||
message ChangeMembershipRequest { | ||
repeated uint64 members = 1; // New set of member node IDs | ||
bool retain = 2; // Whether to retain existing configuration | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
use std::sync::Arc; | ||
|
||
use clap::Parser; | ||
use openraft::Config; | ||
use raft_kv_memstore_grpc::grpc::api_service::ApiServiceImpl; | ||
use raft_kv_memstore_grpc::grpc::internal_service::InternalServiceImpl; | ||
use raft_kv_memstore_grpc::grpc::management_service::ManagementServiceImpl; | ||
use raft_kv_memstore_grpc::network::Network; | ||
use raft_kv_memstore_grpc::protobuf::api_service_server::ApiServiceServer; | ||
use raft_kv_memstore_grpc::protobuf::internal_service_server::InternalServiceServer; | ||
use raft_kv_memstore_grpc::protobuf::management_service_server::ManagementServiceServer; | ||
use raft_kv_memstore_grpc::LogStore; | ||
use raft_kv_memstore_grpc::Raft; | ||
use raft_kv_memstore_grpc::StateMachineStore; | ||
use tonic::transport::Server; | ||
use tracing::info; | ||
|
||
#[derive(Parser, Clone, Debug)] | ||
#[clap(author, version, about, long_about = None)] | ||
pub struct Opt { | ||
#[clap(long)] | ||
pub id: u64, | ||
|
||
#[clap(long)] | ||
/// Network address to bind the server to (e.g., "127.0.0.1:50051") | ||
pub addr: String, | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
// Initialize tracing first, before any logging happens | ||
tracing_subscriber::fmt() | ||
.with_max_level(tracing::Level::INFO) | ||
.with_file(true) | ||
.with_line_number(true) | ||
.init(); | ||
|
||
// Parse the parameters passed by arguments. | ||
let options = Opt::parse(); | ||
let node_id = options.id; | ||
let addr = options.addr; | ||
|
||
// Create a configuration for the raft instance. | ||
let config = Arc::new( | ||
Config { | ||
heartbeat_interval: 500, | ||
election_timeout_min: 1500, | ||
election_timeout_max: 3000, | ||
..Default::default() | ||
} | ||
.validate()?, | ||
); | ||
|
||
// Create stores and network | ||
let log_store = LogStore::default(); | ||
let state_machine_store = Arc::new(StateMachineStore::default()); | ||
let network = Network {}; | ||
|
||
// Create Raft instance | ||
let raft = Raft::new(node_id, config.clone(), network, log_store, state_machine_store.clone()).await?; | ||
|
||
// Create the management service with raft instance | ||
let management_service = ManagementServiceImpl::new(raft.clone()); | ||
let internal_service = InternalServiceImpl::new(raft.clone()); | ||
let api_service = ApiServiceImpl::new(raft, state_machine_store); | ||
|
||
// Start server | ||
let server_future = Server::builder() | ||
.add_service(ManagementServiceServer::new(management_service)) | ||
.add_service(InternalServiceServer::new(internal_service)) | ||
.add_service(ApiServiceServer::new(api_service)) | ||
.serve(addr.parse()?); | ||
|
||
info!("Node {node_id} starting server at {addr}"); | ||
server_future.await?; | ||
|
||
Ok(()) | ||
} |
Oops, something went wrong.